Skip to content

Commit

Permalink
[flink] create table validate options
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhuoyu committed Aug 6, 2024
1 parent edda6b9 commit 4b4becd
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.flink;

import java.util.Arrays;
import java.util.HashSet;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
Expand Down Expand Up @@ -95,8 +93,10 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -350,13 +350,17 @@ private void validateOptions(Map<String, String> options) {
})
.collect(Collectors.toSet());

Set<String> additionalValidOptions = new HashSet<>(Arrays.asList("is_streaming", "connector", "log.system"));
Set<String> additionalValidOptions =
new HashSet<>(Arrays.asList("is_streaming", "connector", "log.system"));

options.keySet().forEach(optionKey -> {
if (!validOptions.contains(optionKey) && !additionalValidOptions.contains(optionKey)) {
throw new IllegalArgumentException("Invalid option: " + optionKey);
}
});
options.keySet()
.forEach(
optionKey -> {
if (!validOptions.contains(optionKey)
&& !additionalValidOptions.contains(optionKey)) {
throw new IllegalArgumentException("Invalid option: " + optionKey);
}
});
}

protected Schema buildPaimonSchema(
Expand Down

0 comments on commit 4b4becd

Please sign in to comment.