Skip to content

Commit

Permalink
[core][spark] Support create database with location, comment, props u…
Browse files Browse the repository at this point in the history
…sing hive catalog (apache#2698)
  • Loading branch information
Zouxxyy authored Jan 16, 2024
1 parent 1028145 commit f0f653e
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public abstract class AbstractCatalog implements Catalog {

public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
protected static final String DB_LOCATION_PROP = "location";

protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
Expand Down Expand Up @@ -94,7 +95,7 @@ public boolean databaseExists(String databaseName) {
protected abstract boolean databaseExistsImpl(String databaseName);

@Override
public void createDatabase(String name, boolean ignoreIfExists)
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
Expand All @@ -105,10 +106,23 @@ public void createDatabase(String name, boolean ignoreIfExists)
}
throw new DatabaseAlreadyExistException(name);
}
createDatabaseImpl(name, properties);
}

createDatabaseImpl(name);
@Override
public Map<String, String> loadDatabaseProperties(String name)
throws DatabaseNotExistException {
if (isSystemDatabase(name)) {
return Collections.emptyMap();
}
if (!databaseExists(name)) {
throw new DatabaseNotExistException(name);
}
return loadDatabasePropertiesImpl(name);
}

protected abstract Map<String, String> loadDatabasePropertiesImpl(String name);

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
Expand All @@ -119,7 +133,7 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER);
}

protected abstract void createDatabaseImpl(String name);
protected abstract void createDatabaseImpl(String name, Map<String, String> properties);

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
Expand Down
24 changes: 22 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,36 @@ default Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier iden
boolean databaseExists(String databaseName);

/**
* Create a database.
* Create a database, see {@link Catalog#createDatabase(String name, boolean ignoreIfExists, Map
* properties)}.
*/
default void createDatabase(String name, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
createDatabase(name, ignoreIfExists, Collections.emptyMap());
}

/**
* Create a database with properties.
*
* @param name Name of the database to be created
* @param ignoreIfExists Flag to specify behavior when a database with the given name already
* exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do
* nothing.
* @param properties properties to be associated with the database
* @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists
* is false
*/
void createDatabase(String name, boolean ignoreIfExists) throws DatabaseAlreadyExistException;
void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException;

/**
* Load database properties.
*
* @param name Database name
* @return The requested database's properties
* @throws DatabaseNotExistException if the requested database does not exist
*/
Map<String, String> loadDatabaseProperties(String name) throws DatabaseNotExistException;

/**
* Drop a database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

Expand All @@ -37,6 +42,8 @@
/** A catalog implementation for {@link FileIO}. */
public class FileSystemCatalog extends AbstractCatalog {

private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalog.class);

private final Path warehouse;

public FileSystemCatalog(FileIO fileIO, Path warehouse) {
Expand Down Expand Up @@ -72,10 +79,24 @@ protected boolean databaseExistsImpl(String databaseName) {
}

@Override
protected void createDatabaseImpl(String name) {
protected void createDatabaseImpl(String name, Map<String, String> properties) {
if (properties.containsKey(AbstractCatalog.DB_LOCATION_PROP)) {
throw new IllegalArgumentException(
"Cannot specify location for a database when using fileSystem catalog.");
}
if (!properties.isEmpty()) {
LOG.warn(
"Currently filesystem catalog can't store database properties, discard properties: {}",
properties);
}
uncheck(() -> fileIO.mkdirs(newDatabasePath(name)));
}

@Override
public Map<String, String> loadDatabasePropertiesImpl(String name) {
return Collections.emptyMap();
}

@Override
protected void dropDatabaseImpl(String name) {
uncheck(() -> fileIO.delete(newDatabasePath(name), true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public CatalogDatabase getDatabase(String databaseName)
@Override
public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
// todo: flink hive catalog support create db with props
if (database != null) {
if (database.getProperties().size() > 0) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@
public class HiveCatalog extends AbstractCatalog {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);

// we don't include paimon-hive-connector as dependencies because it depends on
// hive-exec
// Reserved properties
public static final String DB_COMMENT_PROP = "comment";
public static final String TABLE_TYPE_PROP = "table_type";
public static final String PAIMON_TABLE_TYPE_VALUE = "paimon";

// we don't include paimon-hive-connector as dependencies because it depends on
// hive-exec
private static final String INPUT_FORMAT_CLASS_NAME =
"org.apache.paimon.hive.mapred.PaimonInputFormat";
private static final String OUTPUT_FORMAT_CLASS_NAME =
Expand Down Expand Up @@ -219,20 +221,60 @@ protected boolean databaseExistsImpl(String databaseName) {
}

@Override
protected void createDatabaseImpl(String name) {
protected void createDatabaseImpl(String name, Map<String, String> properties) {
try {
Path databasePath = newDatabasePath(name);
Database database = convertToHiveDatabase(name, properties);
Path databasePath =
database.getLocationUri() == null
? newDatabasePath(name)
: new Path(database.getLocationUri());
locationHelper.createPathIfRequired(databasePath, fileIO);

Database database = new Database();
database.setName(name);
locationHelper.specifyDatabaseLocation(databasePath, database);
client.createDatabase(database);
} catch (TException | IOException e) {
throw new RuntimeException("Failed to create database " + name, e);
}
}

private Database convertToHiveDatabase(String name, Map<String, String> properties) {
Database database = new Database();
database.setName(name);
Map<String, String> parameter = new HashMap<>();
properties.forEach(
(key, value) -> {
if (key.equals(DB_COMMENT_PROP)) {
database.setDescription(value);
} else if (key.equals(DB_LOCATION_PROP)) {
database.setLocationUri(value);
} else if (value != null) {
parameter.put(key, value);
}
});
database.setParameters(parameter);
return database;
}

@Override
public Map<String, String> loadDatabasePropertiesImpl(String name) {
try {
return convertToProperties(client.getDatabase(name));
} catch (TException e) {
throw new RuntimeException(
String.format("Failed to get database %s properties", name), e);
}
}

private Map<String, String> convertToProperties(Database database) {
Map<String, String> properties = new HashMap<>(database.getParameters());
if (database.getLocationUri() != null) {
properties.put(DB_LOCATION_PROP, database.getLocationUri());
}
if (database.getDescription() != null) {
properties.put(DB_COMMENT_PROP, database.getDescription());
}
return properties;
}

@Override
protected void dropDatabaseImpl(String name) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void createNamespace(String[] namespace, Map<String, String> metadata)
"Namespace %s is not valid",
Arrays.toString(namespace));
try {
catalog.createDatabase(namespace[0], false);
catalog.createDatabase(namespace[0], false, metadata);
} catch (Catalog.DatabaseAlreadyExistException e) {
throw new NamespaceAlreadyExistsException(namespace);
}
Expand Down Expand Up @@ -142,10 +142,12 @@ public Map<String, String> loadNamespaceMetadata(String[] namespace)
isValidateNamespace(namespace),
"Namespace %s is not valid",
Arrays.toString(namespace));
if (catalog.databaseExists(namespace[0])) {
return Collections.emptyMap();
String dataBaseName = namespace[0];
try {
return catalog.loadDatabaseProperties(dataBaseName);
} catch (Catalog.DatabaseNotExistException e) {
throw new NoSuchNamespaceException(namespace);
}
throw new NoSuchNamespaceException(namespace);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,34 @@
package org.apache.paimon.spark

import org.apache.paimon.hive.TestHiveMetastore
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

import org.apache.spark.SparkConf
import org.apache.spark.paimon.Utils

import java.io.File

class PaimonHiveTestBase extends PaimonSparkTestBase {

protected lazy val tempHiveDBDir: File = Utils.createTempDir

protected lazy val testHiveMetastore: TestHiveMetastore = new TestHiveMetastore

protected val paimonHiveCatalog: String = "paimon_hive"

protected val hiveDbName: String = "test_hive"

/**
* Add spark_catalog ([[SparkGenericCatalog]] in hive) and paimon_hive ([[SparkCatalog]] in hive)
* catalog
*/
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.warehouse.dir", tempDBDir.getCanonicalPath)
.set("spark.sql.warehouse.dir", tempHiveDBDir.getCanonicalPath)
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.catalog.spark_catalog", classOf[SparkGenericCatalog[_]].getName)
.set("spark.sql.extensions", classOf[PaimonSparkSessionExtensions].getName)
.set(s"spark.sql.catalog.$paimonHiveCatalog", classOf[SparkCatalog].getName)
.set(s"spark.sql.catalog.$paimonHiveCatalog.metastore", "hive")
.set(s"spark.sql.catalog.$paimonHiveCatalog.warehouse", tempHiveDBDir.getCanonicalPath)
}

override protected def beforeAll(): Unit = {
Expand All @@ -55,8 +67,8 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
}
}

/** Default is spark_catalog */
override protected def beforeEach(): Unit = {
spark.sql(s"USE spark_catalog")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTab

protected val tableName0: String = "T"

/** Add paimon ([[SparkCatalog]] in fileSystem) catalog */
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
Expand All @@ -70,6 +71,7 @@ class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTab
}
}

/** Default is paimon catalog */
override protected def beforeEach(): Unit = {
super.beforeAll()
spark.sql(s"USE paimon")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {

import testImplicits._

test("Paimon: Create Table As Select") {
test("Paimon DDL: Create Table As Select") {
withTable("source", "t1", "t2") {
Seq((1L, "x1", "2023"), (2L, "x2", "2023"))
.toDF("a", "b", "pt")
Expand Down Expand Up @@ -58,4 +58,16 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}

test("Paimon DDL: create database with location with filesystem catalog") {
withTempDir {
dBLocation =>
withDatabase("paimon_db") {
val error = intercept[Exception] {
spark.sql(s"CREATE DATABASE paimon_db LOCATION '${dBLocation.getCanonicalPath}'")
}.getMessage
assert(
error.contains("Cannot specify location for a database when using fileSystem catalog."))
}
}
}
}
Loading

0 comments on commit f0f653e

Please sign in to comment.