Skip to content

Commit

Permalink
[flink] Do not create default database if ddl in it is disable (#2500)
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored Dec 13, 2023
1 parent 36d5318 commit c4f74c4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ public FlinkCatalog(
this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER);
this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
this.disableCreateTableInDefaultDatabase = options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
if (!catalog.databaseExists(defaultDatabase)) {
try {
catalog.createDatabase(defaultDatabase, true);
} catch (Catalog.DatabaseAlreadyExistException ignore) {
if (!disableCreateTableInDefaultDatabase) {
if (!catalog.databaseExists(defaultDatabase)) {
try {
catalog.createDatabase(defaultDatabase, true);
} catch (Catalog.DatabaseAlreadyExistException ignore) {
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatCollection;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link FlinkCatalog}. */
Expand Down Expand Up @@ -536,10 +537,12 @@ public void testDisableCreateTableInDefaultDB()
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
"Creating table in default database is disabled, please specify a database name.");
assertThatCollection(catalog.listDatabases()).isEmpty();

catalog.createDatabase("db1", null, false);
assertThatCode(() -> catalog.createTable(path1, this.createTable(new HashMap<>(0)), false))
.doesNotThrowAnyException();
assertThat(catalog.listDatabases()).containsExactlyInAnyOrder("db1");

conf.set(FlinkCatalogOptions.DEFAULT_DATABASE, "default-db");
Catalog catalog1 =
Expand Down

0 comments on commit c4f74c4

Please sign in to comment.