Skip to content

Commit

Permalink
[flink] create table validate options
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhuoyu committed Aug 7, 2024
1 parent b194e19 commit b0adcdf
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.format.OrcOptions;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -90,16 +92,22 @@

import javax.annotation.Nullable;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT;
import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
Expand Down Expand Up @@ -311,6 +319,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
// the returned value of "table.getOptions" may be unmodifiable (for example from
// TableDescriptor)
Map<String, String> options = new HashMap<>(table.getOptions());
validateOptions(options);
Schema paimonSchema = buildPaimonSchema(identifier, (CatalogTable) table, options);

boolean unRegisterLogSystem = false;
Expand All @@ -329,6 +338,77 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private void validateOptions(Map<String, String> options) {
Set<String> validOptions = getValidOptions();
// TODO:
// There are many non-standard options in the test cases.
// we will drive the removal of the following parameters.
Set<String> additionalValidOptions =
new HashSet<>(
Arrays.asList(
"is_streaming",
"connector",
"log.system",
"kafka.bootstrap.servers",
"kafka.topic",
"kafka.transaction.timeout.ms",
"kafka.batch.size",
"file-index.bloom-filter.columns",
"file-index.bloom-filter.indexc.items",
"a.aa.aaa",
"b.bb.bbb",
"c.cc.ccc",
"opt1",
"opt3"));

Pattern fieldsPattern =
Pattern.compile(
"^fields\\.[^.]+\\.(aggregate-function|ignore-retract|nested-key|distinct|sequence-group|default-value)$");

options.keySet()
.forEach(
optionKey -> {
if (!validOptions.contains(optionKey)
&& !additionalValidOptions.contains(optionKey)
&& !fieldsPattern.matcher(optionKey).matches()) {
throw new IllegalArgumentException("Invalid option: " + optionKey);
}
});
}

private static Set<String> getValidOptions() {
Stream<Field> coreOptionsFields = Stream.of(CoreOptions.class.getFields());
Stream<Field> flinkConnectorOptionsFields =
Stream.of(FlinkConnectorOptions.class.getFields());
Stream<Field> flinkCatalogOptionsFields = Stream.of(FlinkCatalogOptions.class.getFields());
Stream<Field> orcOptionsFields = Stream.of(OrcOptions.class.getFields());

Stream<Field> rocksDBOptionsFields;
try {
Class<?> rocksDBOptionsClass = Class.forName("org.apache.paimon.lookup.RocksDBOptions");
rocksDBOptionsFields = Stream.of(rocksDBOptionsClass.getFields());
} catch (ClassNotFoundException e) {
rocksDBOptionsFields = Stream.empty();
}

return Stream.concat(
Stream.concat(
Stream.concat(coreOptionsFields, flinkConnectorOptionsFields),
Stream.concat(flinkCatalogOptionsFields, orcOptionsFields)),
rocksDBOptionsFields)
.filter(field -> java.lang.reflect.Modifier.isStatic(field.getModifiers()))
.filter(field -> ConfigOption.class.isAssignableFrom(field.getType()))
.map(
field -> {
try {
return ((ConfigOption<?>) field.get(null)).key();
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toSet());
}

protected Schema buildPaimonSchema(
Identifier identifier, CatalogTable catalogTable, Map<String, String> options) {
String connector = options.get(CONNECTOR.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,4 +1032,26 @@ public void testAlterTableMetadataComment() {
// change name from non-physical column to physical column is not allowed
assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY name VARCHAR COMMENT 'header3'"));
}

@Test
public void testCreateTableWithInvalidOption() {
assertThatThrownBy(
() ->
sql(
"CREATE TABLE MyTable (\n"
+ " user_id BIGINT,\n"
+ " item_id BIGINT,\n"
+ " behavior STRING,\n"
+ " dt STRING,\n"
+ " hh STRING,\n"
+ " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ " ) PARTITIONED BY (dt) WITH (\n"
+ " 'partition.expiration-time' = '1 d',\n"
+ " 'partition.timestamp-formattedsdsr' = 'yyyyMMdd' -- this is required in `values-time` strategy.\n"
+ ");\n"),
"")
.rootCause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid option: partition.timestamp-formattedsdsr");
}
}

0 comments on commit b0adcdf

Please sign in to comment.