diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index eeae7331efba..c854be93a508 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -148,10 +148,12 @@ public FlinkCatalog( this.logStoreAutoRegister = options.get(LOG_SYSTEM_AUTO_REGISTER); this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT); this.disableCreateTableInDefaultDatabase = options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB); - if (!catalog.databaseExists(defaultDatabase)) { - try { - catalog.createDatabase(defaultDatabase, true); - } catch (Catalog.DatabaseAlreadyExistException ignore) { + if (!disableCreateTableInDefaultDatabase) { + if (!catalog.databaseExists(defaultDatabase)) { + try { + catalog.createDatabase(defaultDatabase, true); + } catch (Catalog.DatabaseAlreadyExistException ignore) { + } } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index cdca5e7ea426..7f60a2b67529 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -79,6 +79,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatCollection; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link FlinkCatalog}. */ @@ -536,10 +537,12 @@ public void testDisableCreateTableInDefaultDB() .isInstanceOf(UnsupportedOperationException.class) .hasMessage( "Creating table in default database is disabled, please specify a database name."); + assertThatCollection(catalog.listDatabases()).isEmpty(); catalog.createDatabase("db1", null, false); assertThatCode(() -> catalog.createTable(path1, this.createTable(new HashMap<>(0)), false)) .doesNotThrowAnyException(); + assertThat(catalog.listDatabases()).containsExactlyInAnyOrder("db1"); conf.set(FlinkCatalogOptions.DEFAULT_DATABASE, "default-db"); Catalog catalog1 =