From f0f653e47efc3afea4b6ded9ca617b176ab16800 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Tue, 16 Jan 2024 18:27:03 +0800 Subject: [PATCH] [core][spark] Support create database with location, comment, props using hive catalog (#2698) --- .../paimon/catalog/AbstractCatalog.java | 20 ++- .../org/apache/paimon/catalog/Catalog.java | 24 +++- .../paimon/catalog/FileSystemCatalog.java | 23 +++- .../org/apache/paimon/flink/FlinkCatalog.java | 1 + .../org/apache/paimon/hive/HiveCatalog.java | 56 +++++++-- .../org/apache/paimon/spark/SparkCatalog.java | 10 +- .../paimon/spark/PaimonHiveTestBase.scala | 20 ++- .../paimon/spark/PaimonSparkTestBase.scala | 2 + .../apache/paimon/spark/sql/DDLTestBase.scala | 14 ++- .../spark/sql/DDLWithHiveCatalogTest.scala | 117 ++++++++++++++++++ 10 files changed, 265 insertions(+), 22 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 0c1716260f71..22f1b4ac8db0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -59,6 +59,7 @@ public abstract class AbstractCatalog implements Catalog { public static final String DB_SUFFIX = ".db"; protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; + protected static final String DB_LOCATION_PROP = "location"; protected final FileIO fileIO; protected final Map tableDefaultOptions; @@ -94,7 +95,7 @@ public boolean databaseExists(String databaseName) { protected abstract boolean databaseExistsImpl(String databaseName); @Override - public void createDatabase(String name, boolean ignoreIfExists) + public void createDatabase(String name, boolean ignoreIfExists, Map properties) throws DatabaseAlreadyExistException { if (isSystemDatabase(name)) { throw new ProcessSystemDatabaseException(); @@ -105,10 +106,23 @@ public void createDatabase(String name, boolean ignoreIfExists) } throw new DatabaseAlreadyExistException(name); } + createDatabaseImpl(name, properties); + } - createDatabaseImpl(name); + @Override + public Map loadDatabaseProperties(String name) + throws DatabaseNotExistException { + if (isSystemDatabase(name)) { + return Collections.emptyMap(); + } + if (!databaseExists(name)) { + throw new DatabaseNotExistException(name); + } + return loadDatabasePropertiesImpl(name); } + protected abstract Map loadDatabasePropertiesImpl(String name); + @Override public void dropPartition(Identifier identifier, Map partitionSpec) throws TableNotExistException { @@ -119,7 +133,7 @@ public void dropPartition(Identifier identifier, Map partitionSp Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER); } - protected abstract void createDatabaseImpl(String name); + protected abstract void createDatabaseImpl(String name, Map properties); @Override public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 99295c6bdb75..005f5b16aad3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -72,16 +72,36 @@ default Optional metastoreClientFactory(Identifier iden boolean databaseExists(String databaseName); /** - * Create a database. + * Create a database, see {@link Catalog#createDatabase(String name, boolean ignoreIfExists, Map + * properties)}. + */ + default void createDatabase(String name, boolean ignoreIfExists) + throws DatabaseAlreadyExistException { + createDatabase(name, ignoreIfExists, Collections.emptyMap()); + } + + /** + * Create a database with properties. * * @param name Name of the database to be created * @param ignoreIfExists Flag to specify behavior when a database with the given name already * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do * nothing. + * @param properties properties to be associated with the database * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists * is false */ - void createDatabase(String name, boolean ignoreIfExists) throws DatabaseAlreadyExistException; + void createDatabase(String name, boolean ignoreIfExists, Map properties) + throws DatabaseAlreadyExistException; + + /** + * Load database properties. + * + * @param name Database name + * @return The requested database's properties + * @throws DatabaseNotExistException if the requested database does not exist + */ + Map loadDatabaseProperties(String name) throws DatabaseNotExistException; /** * Drop a database. diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 03523da580ee..e458dad7c34e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -27,8 +27,13 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; @@ -37,6 +42,8 @@ /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { + private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalog.class); + private final Path warehouse; public FileSystemCatalog(FileIO fileIO, Path warehouse) { @@ -72,10 +79,24 @@ protected boolean databaseExistsImpl(String databaseName) { } @Override - protected void createDatabaseImpl(String name) { + protected void createDatabaseImpl(String name, Map properties) { + if (properties.containsKey(AbstractCatalog.DB_LOCATION_PROP)) { + throw new IllegalArgumentException( + "Cannot specify location for a database when using fileSystem catalog."); + } + if (!properties.isEmpty()) { + LOG.warn( + "Currently filesystem catalog can't store database properties, discard properties: {}", + properties); + } uncheck(() -> fileIO.mkdirs(newDatabasePath(name))); } + @Override + public Map loadDatabasePropertiesImpl(String name) { + return Collections.emptyMap(); + } + @Override protected void dropDatabaseImpl(String name) { uncheck(() -> fileIO.delete(newDatabasePath(name), true)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 157e9b8639dd..98ca840f0e65 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -191,6 +191,7 @@ public CatalogDatabase getDatabase(String databaseName) @Override public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException { + // todo: flink hive catalog support create db with props if (database != null) { if (database.getProperties().size() > 0) { throw new UnsupportedOperationException( diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 18cb3cea9b5d..279b83b28907 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -94,11 +94,13 @@ public class HiveCatalog extends AbstractCatalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); - // we don't include paimon-hive-connector as dependencies because it depends on - // hive-exec + // Reserved properties + public static final String DB_COMMENT_PROP = "comment"; public static final String TABLE_TYPE_PROP = "table_type"; public static final String PAIMON_TABLE_TYPE_VALUE = "paimon"; + // we don't include paimon-hive-connector as dependencies because it depends on + // hive-exec private static final String INPUT_FORMAT_CLASS_NAME = "org.apache.paimon.hive.mapred.PaimonInputFormat"; private static final String OUTPUT_FORMAT_CLASS_NAME = @@ -219,13 +221,14 @@ protected boolean databaseExistsImpl(String databaseName) { } @Override - protected void createDatabaseImpl(String name) { + protected void createDatabaseImpl(String name, Map properties) { try { - Path databasePath = newDatabasePath(name); + Database database = convertToHiveDatabase(name, properties); + Path databasePath = + database.getLocationUri() == null + ? newDatabasePath(name) + : new Path(database.getLocationUri()); locationHelper.createPathIfRequired(databasePath, fileIO); - - Database database = new Database(); - database.setName(name); locationHelper.specifyDatabaseLocation(databasePath, database); client.createDatabase(database); } catch (TException | IOException e) { @@ -233,6 +236,45 @@ protected void createDatabaseImpl(String name) { } } + private Database convertToHiveDatabase(String name, Map properties) { + Database database = new Database(); + database.setName(name); + Map parameter = new HashMap<>(); + properties.forEach( + (key, value) -> { + if (key.equals(DB_COMMENT_PROP)) { + database.setDescription(value); + } else if (key.equals(DB_LOCATION_PROP)) { + database.setLocationUri(value); + } else if (value != null) { + parameter.put(key, value); + } + }); + database.setParameters(parameter); + return database; + } + + @Override + public Map loadDatabasePropertiesImpl(String name) { + try { + return convertToProperties(client.getDatabase(name)); + } catch (TException e) { + throw new RuntimeException( + String.format("Failed to get database %s properties", name), e); + } + } + + private Map convertToProperties(Database database) { + Map properties = new HashMap<>(database.getParameters()); + if (database.getLocationUri() != null) { + properties.put(DB_LOCATION_PROP, database.getLocationUri()); + } + if (database.getDescription() != null) { + properties.put(DB_COMMENT_PROP, database.getDescription()); + } + return properties; + } + @Override protected void dropDatabaseImpl(String name) { try { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 8b716211bd57..4381813604e1 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -105,7 +105,7 @@ public void createNamespace(String[] namespace, Map metadata) "Namespace %s is not valid", Arrays.toString(namespace)); try { - catalog.createDatabase(namespace[0], false); + catalog.createDatabase(namespace[0], false, metadata); } catch (Catalog.DatabaseAlreadyExistException e) { throw new NamespaceAlreadyExistsException(namespace); } @@ -142,10 +142,12 @@ public Map loadNamespaceMetadata(String[] namespace) isValidateNamespace(namespace), "Namespace %s is not valid", Arrays.toString(namespace)); - if (catalog.databaseExists(namespace[0])) { - return Collections.emptyMap(); + String dataBaseName = namespace[0]; + try { + return catalog.loadDatabaseProperties(dataBaseName); + } catch (Catalog.DatabaseNotExistException e) { + throw new NoSuchNamespaceException(namespace); } - throw new NoSuchNamespaceException(namespace); } /** diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index 9d5c5175430c..4c81d9508655 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -18,22 +18,34 @@ package org.apache.paimon.spark import org.apache.paimon.hive.TestHiveMetastore -import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions import org.apache.spark.SparkConf +import org.apache.spark.paimon.Utils + +import java.io.File class PaimonHiveTestBase extends PaimonSparkTestBase { + protected lazy val tempHiveDBDir: File = Utils.createTempDir + protected lazy val testHiveMetastore: TestHiveMetastore = new TestHiveMetastore + protected val paimonHiveCatalog: String = "paimon_hive" + protected val hiveDbName: String = "test_hive" + /** + * Add spark_catalog ([[SparkGenericCatalog]] in hive) and paimon_hive ([[SparkCatalog]] in hive) + * catalog + */ override protected def sparkConf: SparkConf = { super.sparkConf - .set("spark.sql.warehouse.dir", tempDBDir.getCanonicalPath) + .set("spark.sql.warehouse.dir", tempHiveDBDir.getCanonicalPath) .set("spark.sql.catalogImplementation", "hive") .set("spark.sql.catalog.spark_catalog", classOf[SparkGenericCatalog[_]].getName) - .set("spark.sql.extensions", classOf[PaimonSparkSessionExtensions].getName) + .set(s"spark.sql.catalog.$paimonHiveCatalog", classOf[SparkCatalog].getName) + .set(s"spark.sql.catalog.$paimonHiveCatalog.metastore", "hive") + .set(s"spark.sql.catalog.$paimonHiveCatalog.warehouse", tempHiveDBDir.getCanonicalPath) } override protected def beforeAll(): Unit = { @@ -55,8 +67,8 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { } } + /** Default is spark_catalog */ override protected def beforeEach(): Unit = { spark.sql(s"USE spark_catalog") } - } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index b2484eef91af..d1618c5429b7 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -46,6 +46,7 @@ class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTab protected val tableName0: String = "T" + /** Add paimon ([[SparkCatalog]] in fileSystem) catalog */ override protected def sparkConf: SparkConf = { super.sparkConf .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName) @@ -70,6 +71,7 @@ class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTab } } + /** Default is paimon catalog */ override protected def beforeEach(): Unit = { super.beforeAll() spark.sql(s"USE paimon") diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index 8bb3bd5e84ff..9a681137b0b6 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -25,7 +25,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase { import testImplicits._ - test("Paimon: Create Table As Select") { + test("Paimon DDL: Create Table As Select") { withTable("source", "t1", "t2") { Seq((1L, "x1", "2023"), (2L, "x2", "2023")) .toDF("a", "b", "pt") @@ -58,4 +58,16 @@ abstract class DDLTestBase extends PaimonSparkTestBase { } } + test("Paimon DDL: create database with location with filesystem catalog") { + withTempDir { + dBLocation => + withDatabase("paimon_db") { + val error = intercept[Exception] { + spark.sql(s"CREATE DATABASE paimon_db LOCATION '${dBLocation.getCanonicalPath}'") + }.getMessage + assert( + error.contains("Cannot specify location for a database when using fileSystem catalog.")) + } + } + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala new file mode 100644 index 000000000000..8fd4ace242ee --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonHiveTestBase + +import org.junit.jupiter.api.Assertions + +class DDLWithHiveCatalogTest extends PaimonHiveTestBase { + + test("Paimon DDL with hive catalog: create database with location and comment") { + Seq("spark_catalog", paimonHiveCatalog).foreach { + catalogName => + spark.sql(s"USE $catalogName") + withTempDir { + dBLocation => + withDatabase("paimon_db") { + val comment = "this is a test comment" + spark.sql( + s"CREATE DATABASE paimon_db LOCATION '${dBLocation.getCanonicalPath}' COMMENT '$comment'") + Assertions.assertEquals(getDatabaseLocation("paimon_db"), dBLocation.getCanonicalPath) + Assertions.assertEquals(getDatabaseComment("paimon_db"), comment) + + withTable("paimon_db.paimon_tbl") { + spark.sql(s""" + |CREATE TABLE paimon_db.paimon_tbl (id STRING, name STRING, pt STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key' = 'id') + |""".stripMargin) + Assertions.assertEquals( + getTableLocation("paimon_db.paimon_tbl"), + s"${dBLocation.getCanonicalPath}/paimon_tbl") + } + } + } + } + } + + test("Paimon DDL with hive catalog: create database with props") { + Seq("spark_catalog", paimonHiveCatalog).foreach { + catalogName => + spark.sql(s"USE $catalogName") + withDatabase("paimon_db") { + spark.sql(s"CREATE DATABASE paimon_db WITH DBPROPERTIES ('k1' = 'v1', 'k2' = 'v2')") + val props = getDatabaseProps("paimon_db") + Assertions.assertEquals(props("k1"), "v1") + Assertions.assertEquals(props("k2"), "v2") + } + } + } + + def getDatabaseLocation(dbName: String): String = { + spark + .sql(s"DESC DATABASE $dbName") + .filter("info_name == 'Location'") + .head() + .getAs[String]("info_value") + .split(":")(1) + } + + def getDatabaseComment(dbName: String): String = { + spark + .sql(s"DESC DATABASE $dbName") + .filter("info_name == 'Comment'") + .head() + .getAs[String]("info_value") + } + + def getDatabaseProps(dbName: String): Map[String, String] = { + val dbPropsStr = spark + .sql(s"DESC DATABASE EXTENDED $dbName") + .filter("info_name == 'Properties'") + .head() + .getAs[String]("info_value") + val pattern = "\\(([^,]+),([^)]+)\\)".r + pattern + .findAllIn(dbPropsStr.drop(1).dropRight(1)) + .matchData + .map { + m => + val key = m.group(1).trim + val value = m.group(2).trim + (key, value) + } + .toMap + } + + def getTableLocation(tblName: String): String = { + val tablePropsStr = spark + .sql(s"DESC TABLE EXTENDED $tblName") + .filter("col_name == 'Table Properties'") + .head() + .getAs[String]("data_type") + val tableProps = tablePropsStr + .substring(1, tablePropsStr.length - 1) + .split(",") + .map(_.split("=")) + .map { case Array(key, value) => (key, value) } + .toMap + tableProps("path").split(":")(1) + } +}