Skip to content

Commit

Permalink
[flink] create table validate options
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhuoyu committed Aug 6, 2024
1 parent 8aac48e commit 5eee069
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,13 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private void validateOptions(Map<String, String> options) {
private static void validateOptions(Map<String, String> options) {
Set<String> validOptions =
Stream.of(CoreOptions.class.getFields())
Stream.of(
CoreOptions.class.getFields(),
FlinkConnectorOptions.class.getFields(),
FlinkCatalogOptions.class.getFields())
.flatMap(Stream::of)
.filter(field -> java.lang.reflect.Modifier.isStatic(field.getModifiers()))
.filter(field -> ConfigOption.class.isAssignableFrom(field.getType()))
.map(
Expand All @@ -358,7 +362,9 @@ private void validateOptions(Map<String, String> options) {
"log.system",
"kafka.bootstrap.servers",
"kafka.topic",
"kafka.transaction.timeout.ms"));
"kafka.transaction.timeout.ms",
"fields.cnt.aggregate-function",
"kafka.batch.size"));

options.keySet()
.forEach(
Expand Down

0 comments on commit 5eee069

Please sign in to comment.