diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 0be1a41c667e..fba0ce19d938 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -561,7 +561,7 @@
streaming-read-overwrite |
false |
Boolean |
- Whether to read the changes from overwrite in streaming mode. |
+ Whether to read the changes from overwrite in streaming mode. Cannot be set to true when changelog producer is full-compaction or lookup because it will read duplicated changes. |
tag.automatic-creation |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 88243fc6b9b1..643aee75fc54 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -514,7 +514,8 @@ public class CoreOptions implements Serializable {
.booleanType()
.defaultValue(false)
.withDescription(
- "Whether to read the changes from overwrite in streaming mode.");
+ "Whether to read the changes from overwrite in streaming mode. Cannot be set to true when "
+ + "changelog producer is full-compaction or lookup because it will read duplicated changes.");
public static final ConfigOption DYNAMIC_PARTITION_OVERWRITE =
key("dynamic-partition-overwrite")
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 440a8dc97a06..ed3f28e57473 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -53,6 +53,7 @@
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
+import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -87,6 +88,16 @@ public static void validateTableSchema(TableSchema schema) {
"Can not set %s on table without primary keys, please define primary keys.",
CHANGELOG_PRODUCER.key()));
}
+ if (options.streamingReadOverwrite()
+ && (changelogProducer == ChangelogProducer.FULL_COMPACTION
+ || changelogProducer == ChangelogProducer.LOOKUP)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot set %s to true when changelog producer is %s or %s because it will read duplicated changes.",
+ STREAMING_READ_OVERWRITE.key(),
+ ChangelogProducer.FULL_COMPACTION,
+ ChangelogProducer.LOOKUP));
+ }
checkArgument(
options.snapshotNumRetainMin() > 0,
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index ce21b5035447..a17725605275 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -25,6 +25,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.IntType;
import org.apache.paimon.utils.BlockingIterator;
@@ -682,4 +683,25 @@ private void assertPartitionsTable(String tableName) throws Exception {
assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]", "[3]", "[4]");
}
}
+
+ @Test
+ public void testInvalidStreamingReadOverwrite() {
+ String ddl =
+ "CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b STRING)"
+ + "WITH ('changelog-producer' = '%s', 'streaming-read-overwrite' = 'true')";
+
+ assertThatThrownBy(() -> sql(ddl, "full-compaction"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot set streaming-read-overwrite to true when changelog producer "
+ + "is full-compaction or lookup because it will read duplicated changes."));
+
+ assertThatThrownBy(() -> sql(ddl, "lookup"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot set streaming-read-overwrite to true when changelog producer "
+ + "is full-compaction or lookup because it will read duplicated changes."));
+ }
}