From a2696e5f337500ac6703d43d8fafe17686382771 Mon Sep 17 00:00:00 2001 From: chenzhuoyu Date: Tue, 6 Aug 2024 22:14:46 +0800 Subject: [PATCH] [flink] create table validate options --- .../org/apache/paimon/flink/FlinkCatalog.java | 51 +++++++++++++++++++ .../paimon/flink/SchemaChangeITCase.java | 22 ++++++++ 2 files changed, 73 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 2cb96e00750ab..7795ad1c73374 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; +import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -92,14 +93,19 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT; import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; @@ -311,6 +317,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig // the returned value of "table.getOptions" may be unmodifiable (for example from // TableDescriptor) Map options = new HashMap<>(table.getOptions()); + validateOptions(options); Schema paimonSchema = buildPaimonSchema(identifier, (CatalogTable) table, options); boolean unRegisterLogSystem = false; @@ -329,6 +336,50 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } + private static void validateOptions(Map options) { + Set validOptions = + Stream.of( + CoreOptions.class.getFields(), + FlinkConnectorOptions.class.getFields(), + FlinkCatalogOptions.class.getFields()) + .flatMap(Stream::of) + .filter(field -> java.lang.reflect.Modifier.isStatic(field.getModifiers())) + .filter(field -> ConfigOption.class.isAssignableFrom(field.getType())) + .map( + field -> { + try { + return ((ConfigOption) field.get(null)).key(); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + + Set additionalValidOptions = + new HashSet<>( + Arrays.asList( + "is_streaming", + "connector", + "log.system", + "kafka.bootstrap.servers", + "kafka.topic", + "kafka.transaction.timeout.ms", + "fields.cnt.aggregate-function", + "kafka.batch.size")); + + Pattern fieldsPattern = Pattern.compile("^fields\\..+"); + + options.keySet() + .forEach( + optionKey -> { + if (!validOptions.contains(optionKey) + && !additionalValidOptions.contains(optionKey) + && !fieldsPattern.matcher(optionKey).matches()) { + throw new IllegalArgumentException("Invalid option: " + optionKey); + } + }); + } + protected Schema buildPaimonSchema( Identifier identifier, CatalogTable catalogTable, Map options) { String connector = options.get(CONNECTOR.key()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 81f07b224ca70..472c1030e0ea1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1032,4 +1032,26 @@ public void testAlterTableMetadataComment() { // change name from non-physical column to physical column is not allowed assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY name VARCHAR COMMENT 'header3'")); } + + @Test + public void testCreateTableWithInvalidOption() { + assertThatThrownBy( + () -> + sql( + "CREATE TABLE MyTable (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING,\n" + + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + + " ) PARTITIONED BY (dt) WITH (\n" + + " 'partition.expiration-time' = '1 d',\n" + + " 'partition.timestamp-formattedsdsr' = 'yyyyMMdd' -- this is required in `values-time` strategy.\n" + + ");\n"), + "") + .rootCause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid option: partition.timestamp-formattedsdsr"); + } }