Skip to content

Commit

Permalink
[PAIMON-3219][Flink] support create database with properties when inn…
Browse files Browse the repository at this point in the history
…er catalog supports.
  • Loading branch information
wenchao.wu committed Apr 17, 2024
1 parent c05cbca commit 988369e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ default boolean caseSensitive() {
return true;
}

/**
* Return a boolean that indicates whether this catalog supports create database with
* properties.
*/
default boolean supportDatabaseProperties() {
return false;
}

/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ public Optional<CatalogLockContext> lockContext() {
return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options));
}

@Override
public boolean supportDatabaseProperties() {
return true;
}

private Lock lock(Identifier identifier) {
if (!lockEnabled()) {
return new Lock.EmptyLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,22 +191,23 @@ public CatalogDatabase getDatabase(String databaseName)
@Override
public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
// todo: flink hive catalog support create db with props
if (database != null) {
if (database.getProperties().size() > 0) {
throw new UnsupportedOperationException(
"Create database with properties is unsupported.");
}
try {
if (database != null) {
if (database.getProperties().size() > 0 && !catalog.supportDatabaseProperties()) {
throw new UnsupportedOperationException(
"Create database with properties is unsupported.");
}

if (database.getDescription().isPresent()
&& !database.getDescription().get().equals("")) {
throw new UnsupportedOperationException(
"Create database with description is unsupported.");
}
}
if (database.getDescription().isPresent()
&& !database.getDescription().get().equals("")) {
throw new UnsupportedOperationException(
"Create database with description is unsupported.");
}

try {
catalog.createDatabase(name, ignoreIfExists);
catalog.createDatabase(name, ignoreIfExists, database.getProperties());
} else {
catalog.createDatabase(name, ignoreIfExists);
}
} catch (Catalog.DatabaseAlreadyExistException e) {
throw new DatabaseAlreadyExistException(getName(), e.database());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ protected void createDatabaseImpl(String name, Map<String, String> properties) {
}
}

@Override
public boolean supportDatabaseProperties() {
return true;
}

private Database convertToHiveDatabase(String name, Map<String, String> properties) {
Database database = new Database();
database.setName(name);
Expand Down

0 comments on commit 988369e

Please sign in to comment.