From 0ce3be0d106a4c76ead5dd5b8b279e6474a4ae44 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Thu, 26 Oct 2023 17:42:09 +0800 Subject: [PATCH 1/2] [core] Add validation to forbid setting stream-read-overwrite with 'full-compaction' or 'lookup' changelog producer --- .../org/apache/paimon/schema/SchemaValidation.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 440a8dc97a06..2f5f572af6b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -53,6 +53,7 @@ import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; +import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE; import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX; import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -87,6 +88,16 @@ public static void validateTableSchema(TableSchema schema) { "Can not set %s on table without primary keys, please define primary keys.", CHANGELOG_PRODUCER.key())); } + if (options.streamingReadOverwrite() + && (changelogProducer == ChangelogProducer.FULL_COMPACTION + || changelogProducer == ChangelogProducer.LOOKUP)) { + throw new UnsupportedOperationException( + String.format( + "Cannot set %s to true when changelog producer is %s or %s.", + STREAMING_READ_OVERWRITE.key(), + ChangelogProducer.FULL_COMPACTION, + ChangelogProducer.LOOKUP)); + } checkArgument( options.snapshotNumRetainMin() > 0, From 175294c428c1abe3c95680c1b2efa45f3c31427c Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 27 Oct 2023 16:40:15 +0800 Subject: [PATCH 2/2] address comments --- .../generated/core_configuration.html | 2 +- .../java/org/apache/paimon/CoreOptions.java | 3 ++- .../paimon/schema/SchemaValidation.java | 2 +- .../paimon/flink/CatalogTableITCase.java | 22 +++++++++++++++++++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 0be1a41c667e..fba0ce19d938 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -561,7 +561,7 @@
streaming-read-overwrite
false Boolean - Whether to read the changes from overwrite in streaming mode. + Whether to read the changes from overwrite in streaming mode. Cannot be set to true when changelog producer is full-compaction or lookup because it will read duplicated changes.
tag.automatic-creation
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 88243fc6b9b1..643aee75fc54 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -514,7 +514,8 @@ public class CoreOptions implements Serializable { .booleanType() .defaultValue(false) .withDescription( - "Whether to read the changes from overwrite in streaming mode."); + "Whether to read the changes from overwrite in streaming mode. Cannot be set to true when " + + "changelog producer is full-compaction or lookup because it will read duplicated changes."); public static final ConfigOption DYNAMIC_PARTITION_OVERWRITE = key("dynamic-partition-overwrite") diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 2f5f572af6b0..ed3f28e57473 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -93,7 +93,7 @@ public static void validateTableSchema(TableSchema schema) { || changelogProducer == ChangelogProducer.LOOKUP)) { throw new UnsupportedOperationException( String.format( - "Cannot set %s to true when changelog producer is %s or %s.", + "Cannot set %s to true when changelog producer is %s or %s because it will read duplicated changes.", STREAMING_READ_OVERWRITE.key(), ChangelogProducer.FULL_COMPACTION, ChangelogProducer.LOOKUP)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index ce21b5035447..a17725605275 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -25,6 +25,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.system.AllTableOptionsTable; import org.apache.paimon.table.system.CatalogOptionsTable; +import org.apache.paimon.testutils.assertj.AssertionUtils; import org.apache.paimon.types.IntType; import org.apache.paimon.utils.BlockingIterator; @@ -682,4 +683,25 @@ private void assertPartitionsTable(String tableName) throws Exception { assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]", "[3]", "[4]"); } } + + @Test + public void testInvalidStreamingReadOverwrite() { + String ddl = + "CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b STRING)" + + "WITH ('changelog-producer' = '%s', 'streaming-read-overwrite' = 'true')"; + + assertThatThrownBy(() -> sql(ddl, "full-compaction")) + .satisfies( + AssertionUtils.anyCauseMatches( + UnsupportedOperationException.class, + "Cannot set streaming-read-overwrite to true when changelog producer " + + "is full-compaction or lookup because it will read duplicated changes.")); + + assertThatThrownBy(() -> sql(ddl, "lookup")) + .satisfies( + AssertionUtils.anyCauseMatches( + UnsupportedOperationException.class, + "Cannot set streaming-read-overwrite to true when changelog producer " + + "is full-compaction or lookup because it will read duplicated changes.")); + } }