Skip to content

Commit

Permalink
[flink] create table validate options
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhuoyu committed Aug 6, 2024
1 parent b194e19 commit a2696e5
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -92,14 +93,19 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT;
import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
Expand Down Expand Up @@ -311,6 +317,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
// the returned value of "table.getOptions" may be unmodifiable (for example from
// TableDescriptor)
Map<String, String> options = new HashMap<>(table.getOptions());
validateOptions(options);
Schema paimonSchema = buildPaimonSchema(identifier, (CatalogTable) table, options);

boolean unRegisterLogSystem = false;
Expand All @@ -329,6 +336,50 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private static void validateOptions(Map<String, String> options) {
Set<String> validOptions =
Stream.of(
CoreOptions.class.getFields(),
FlinkConnectorOptions.class.getFields(),
FlinkCatalogOptions.class.getFields())
.flatMap(Stream::of)
.filter(field -> java.lang.reflect.Modifier.isStatic(field.getModifiers()))
.filter(field -> ConfigOption.class.isAssignableFrom(field.getType()))
.map(
field -> {
try {
return ((ConfigOption<?>) field.get(null)).key();
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toSet());

Set<String> additionalValidOptions =
new HashSet<>(
Arrays.asList(
"is_streaming",
"connector",
"log.system",
"kafka.bootstrap.servers",
"kafka.topic",
"kafka.transaction.timeout.ms",
"fields.cnt.aggregate-function",
"kafka.batch.size"));

Pattern fieldsPattern = Pattern.compile("^fields\\..+");

options.keySet()
.forEach(
optionKey -> {
if (!validOptions.contains(optionKey)
&& !additionalValidOptions.contains(optionKey)
&& !fieldsPattern.matcher(optionKey).matches()) {
throw new IllegalArgumentException("Invalid option: " + optionKey);
}
});
}

protected Schema buildPaimonSchema(
Identifier identifier, CatalogTable catalogTable, Map<String, String> options) {
String connector = options.get(CONNECTOR.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,4 +1032,26 @@ public void testAlterTableMetadataComment() {
// change name from non-physical column to physical column is not allowed
assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY name VARCHAR COMMENT 'header3'"));
}

@Test
public void testCreateTableWithInvalidOption() {
assertThatThrownBy(
() ->
sql(
"CREATE TABLE MyTable (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " behavior STRING,\n"
+ " dt STRING,\n"
+ " hh STRING,\n"
+ " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ " ) PARTITIONED BY (dt) WITH (\n"
+ " 'partition.expiration-time' = '1 d',\n"
+ " 'partition.timestamp-formattedsdsr' = 'yyyyMMdd' -- this is required in `values-time` strategy.\n"
+ ");\n"),
"")
.rootCause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid option: partition.timestamp-formattedsdsr");
}
}

0 comments on commit a2696e5

Please sign in to comment.