diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 74302a3dc792..b2370e5c283a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -37,11 +37,13 @@ import java.util.Objects; import java.util.Optional; +import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.SCAN_MODE; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; +import static org.apache.paimon.CoreOptions.WRITE_MODE; import static org.apache.paimon.WriteMode.APPEND_ONLY; import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX; import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES; @@ -78,6 +80,14 @@ public static void validateTableSchema(TableSchema schema) { checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); } + if (options.writeMode() == WriteMode.APPEND_ONLY + && options.changelogProducer() != CoreOptions.ChangelogProducer.NONE) { + throw new UnsupportedOperationException( + String.format( + "Can not set the %s to %s and %s at the same time.", + WRITE_MODE.key(), APPEND_ONLY, CHANGELOG_PRODUCER.key())); + } + checkArgument( options.snapshotNumRetainMin() > 0, SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 005fc39b27c2..90edfd2cbb79 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -292,6 +292,25 @@ public void testCreateTableAs() throws Exception { "Cannot define partition on DDL and table options at the same time."); } + @Test + public void testConflictOption() { + assertThatThrownBy( + () -> + sql( + "CREATE TABLE T (a INT) WITH ('write-mode' = 'append-only', 'changelog-producer' = 'input')")) + .getRootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Can not set the write-mode to append-only and changelog-producer at the same time."); + + sql("CREATE TABLE T (a INT) WITH ('write-mode' = 'append-only')"); + assertThatThrownBy(() -> sql("ALTER TABLE T SET ('changelog-producer'='input')")) + .getRootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Can not set the write-mode to append-only and changelog-producer at the same time."); + } + @Test public void testFilesTable() throws Exception { sql( diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index bbe5f2a56355..1ffb0d9ab3df 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -265,6 +265,29 @@ public void testCreateTableAs() { .containsExactlyInAnyOrder("[1,2,bbb,2020-01-01,12]"); } + @Test + public void testConflictOption() { + assertThatThrownBy( + () -> + spark.sql( + "CREATE TABLE T (a INT) TBLPROPERTIES ('write-mode' = 'append-only', 'changelog-producer' = 'input')")) + .getRootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Can not set the write-mode to append-only and changelog-producer at the same time."); + + spark.sql("CREATE TABLE T (a INT) TBLPROPERTIES ('write-mode' = 'append-only')"); + + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE T SET TBLPROPERTIES('changelog-producer' 'input')")) + .getRootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Can not set the write-mode to append-only and changelog-producer at the same time."); + } + @Test public void testCreateTableWithNullablePk() { spark.sql(