Skip to content

Commit

Permalink
[flink][spark] Check Exists before create database to avoid Permissio…
Browse files Browse the repository at this point in the history
…n issue
  • Loading branch information
JingsongLi committed Dec 11, 2023
1 parent ac328f1 commit 38aa07b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,11 @@ public FlinkCatalog(
this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
this.disableCreateTableInDefaultDatabase = options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
try {
this.catalog.createDatabase(defaultDatabase, true);
} catch (Catalog.DatabaseAlreadyExistException ignore) {
if (!catalog.databaseExists(defaultDatabase)) {
try {
catalog.createDatabase(defaultDatabase, true);
} catch (Catalog.DatabaseAlreadyExistException ignore) {
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
Options.fromMap(options),
SparkSession.active().sessionState().newHadoopConf());
this.catalog = CatalogFactory.createCatalog(catalogContext);
try {
createNamespace(defaultNamespace(), new HashMap<>());
} catch (NamespaceAlreadyExistsException ignored) {
if (!catalog.databaseExists(defaultNamespace()[0])) {
try {
createNamespace(defaultNamespace(), new HashMap<>());
} catch (NamespaceAlreadyExistsException ignored) {
}
}
}

Expand Down

0 comments on commit 38aa07b

Please sign in to comment.