diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 2cb96e00750a..bbd686405ad0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -24,6 +24,8 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; +import org.apache.paimon.format.OrcOptions; +import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -90,16 +92,22 @@ import javax.annotation.Nullable; +import java.lang.reflect.Field; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT; import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; @@ -311,6 +319,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig // the returned value of "table.getOptions" may be unmodifiable (for example from // TableDescriptor) Map options = new HashMap<>(table.getOptions()); + validateOptions(options); Schema paimonSchema = buildPaimonSchema(identifier, (CatalogTable) table, options); boolean unRegisterLogSystem = false; @@ -329,6 +338,78 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } + private void validateOptions(Map options) { + Set validOptions = getValidOptions(); + // TODO: + // There are many non-standard options in the test cases. + // we will drive the removal of the following parameters. + Set additionalValidOptions = + new HashSet<>( + Arrays.asList( + "is_streaming", + "connector", + "log.system", + "kafka.bootstrap.servers", + "kafka.topic", + "kafka.transaction.timeout.ms", + "kafka.batch.size", + "file-index.bloom-filter.columns", + "file-index.bloom-filter.indexc.items", + "a.aa.aaa", + "b.bb.bbb", + "c.cc.ccc", + "opt1", + "opt3")); + + Pattern fieldsPattern = + Pattern.compile( + "^fields\\.[^.]+\\.(aggregate-function|ignore-retract|nested-key|distinct|sequence-group|default-value)$"); + + options.keySet() + .forEach( + optionKey -> { + if (!validOptions.contains(optionKey) + && !additionalValidOptions.contains(optionKey) + && !fieldsPattern.matcher(optionKey).matches()) { + throw new IllegalArgumentException("Invalid option: " + optionKey); + } + }); + } + + private static Set getValidOptions() { + Stream coreOptionsFields = Stream.of(CoreOptions.class.getFields()); + Stream flinkConnectorOptionsFields = + Stream.of(FlinkConnectorOptions.class.getFields()); + Stream flinkCatalogOptionsFields = Stream.of(FlinkCatalogOptions.class.getFields()); + Stream orcOptionsFields = Stream.of(OrcOptions.class.getFields()); + + Stream rocksDBOptionsFields; + try { + Class rocksDBOptionsClass = Class.forName("org.apache.paimon.lookup.RocksDBOptions"); + rocksDBOptionsFields = Stream.of(rocksDBOptionsClass.getFields()); + } catch (ClassNotFoundException | NoClassDefFoundError e) { + // RocksDBOptions class not found. + rocksDBOptionsFields = Stream.empty(); + } + + return Stream.concat( + Stream.concat( + Stream.concat(coreOptionsFields, flinkConnectorOptionsFields), + Stream.concat(flinkCatalogOptionsFields, orcOptionsFields)), + rocksDBOptionsFields) + .filter(field -> java.lang.reflect.Modifier.isStatic(field.getModifiers())) + .filter(field -> ConfigOption.class.isAssignableFrom(field.getType())) + .map( + field -> { + try { + return ((ConfigOption) field.get(null)).key(); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + } + protected Schema buildPaimonSchema( Identifier identifier, CatalogTable catalogTable, Map options) { String connector = options.get(CONNECTOR.key()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 81f07b224ca7..472c1030e0ea 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1032,4 +1032,26 @@ public void testAlterTableMetadataComment() { // change name from non-physical column to physical column is not allowed assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY name VARCHAR COMMENT 'header3'")); } + + @Test + public void testCreateTableWithInvalidOption() { + assertThatThrownBy( + () -> + sql( + "CREATE TABLE MyTable (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING,\n" + + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + + " ) PARTITIONED BY (dt) WITH (\n" + + " 'partition.expiration-time' = '1 d',\n" + + " 'partition.timestamp-formattedsdsr' = 'yyyyMMdd' -- this is required in `values-time` strategy.\n" + + ");\n"), + "") + .rootCause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid option: partition.timestamp-formattedsdsr"); + } }