From 5fbf72763159eda632936c7f0b0b05794899b7be Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Thu, 11 Apr 2024 16:28:45 +0800 Subject: [PATCH] [spark] Support config defaultDatabase for spark catalog (#3130) --- docs/content/maintenance/configurations.md | 6 +++ .../spark_catalog_configuration.html | 42 ++++++++++++++++ .../spark_connector_configuration.html | 6 --- .../paimon/spark/utils/SQLConfUtils.java | 29 +++++++++++ paimon-spark/paimon-spark-3.2/pom.xml | 38 ++++++++++++++ .../paimon/spark/utils/SQLConfUtils.java | 29 +++++++++++ .../src/test/resources/hive-site.xml | 50 +++++++++++++++++++ .../spark/sql/DDLWithHiveCatalogTest.scala | 23 +++++++++ paimon-spark/paimon-spark-3.3/pom.xml | 38 ++++++++++++++ .../paimon/spark/utils/SQLConfUtils.java | 29 +++++++++++ .../src/test/resources/hive-site.xml | 50 +++++++++++++++++++ .../spark/sql/DDLWithHiveCatalogTest.scala | 23 +++++++++ paimon-spark/paimon-spark-3.4/pom.xml | 38 ++++++++++++++ .../src/test/resources/hive-site.xml | 50 +++++++++++++++++++ .../spark/sql/DDLWithHiveCatalogTest.scala | 21 ++++++++ paimon-spark/paimon-spark-3.5/pom.xml | 38 ++++++++++++++ .../src/test/resources/hive-site.xml | 50 +++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 38 ++++++++++++++ .../spark/sql/DDLWithHiveCatalogTest.scala | 21 ++++++++ .../org/apache/paimon/spark/SparkCatalog.java | 7 ++- .../paimon/spark/SparkCatalogOptions.java | 40 +++++++++++++++ .../paimon/spark/SparkConnectorOptions.java | 6 --- .../paimon/spark/SparkGenericCatalog.java | 24 +++++++-- .../paimon/spark/utils/SQLConfUtils.java | 28 +++++++++++ .../paimon/spark/PaimonHiveTestBase.scala | 17 ++++++- ...scala => DDLWithHiveCatalogTestBase.scala} | 41 ++++++++++++++- 26 files changed, 762 insertions(+), 20 deletions(-) create mode 100644 docs/layouts/shortcodes/generated/spark_catalog_configuration.html create mode 100644 paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java create mode 100644 paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java create mode 100644 paimon-spark/paimon-spark-3.2/src/test/resources/hive-site.xml create mode 100644 paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala create mode 100644 paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java create mode 100644 paimon-spark/paimon-spark-3.3/src/test/resources/hive-site.xml create mode 100644 paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala create mode 100644 paimon-spark/paimon-spark-3.4/src/test/resources/hive-site.xml create mode 100644 paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala create mode 100644 paimon-spark/paimon-spark-3.5/src/test/resources/hive-site.xml create mode 100644 paimon-spark/paimon-spark-3.5/src/test/resources/log4j2-test.properties create mode 100644 paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/SQLConfUtils.java rename paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/{DDLWithHiveCatalogTest.scala => DDLWithHiveCatalogTestBase.scala} (73%) diff --git a/docs/content/maintenance/configurations.md b/docs/content/maintenance/configurations.md index 564b7c44c9df..3849d70a5862 100644 --- a/docs/content/maintenance/configurations.md +++ b/docs/content/maintenance/configurations.md @@ -68,6 +68,12 @@ Flink connector options for paimon. {{< generated/flink_connector_configuration >}} +### SparkCatalogOptions + +Spark catalog options for paimon. + +{{< generated/spark_catalog_configuration >}} + ### SparkConnectorOptions Spark connector options for paimon. diff --git a/docs/layouts/shortcodes/generated/spark_catalog_configuration.html b/docs/layouts/shortcodes/generated/spark_catalog_configuration.html new file mode 100644 index 000000000000..a3431959a163 --- /dev/null +++ b/docs/layouts/shortcodes/generated/spark_catalog_configuration.html @@ -0,0 +1,42 @@ +{{/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/}} + + + + + + + + + + + + + + + + + + + + + + + +
KeyDefaultTypeDescription
catalog.create-underlying-session-catalog
falseBooleanIf true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog.
defaultDatabase
"default"StringThe default database name.
diff --git a/docs/layouts/shortcodes/generated/spark_connector_configuration.html b/docs/layouts/shortcodes/generated/spark_connector_configuration.html index 9e74cefbcfab..00ca2ba17d39 100644 --- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html @@ -26,12 +26,6 @@ - -
catalog.create-underlying-session-catalog
- false - Boolean - If true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog. -
read.changelog
false diff --git a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java new file mode 100644 index 000000000000..38a4b4fccf3e --- /dev/null +++ b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.utils; + +import org.apache.paimon.catalog.Catalog; +import org.apache.spark.sql.internal.SQLConf; + +/** SQLConf utils. */ +public class SQLConfUtils { + public static String defaultDatabase(SQLConf sqlConf) { + return Catalog.DEFAULT_DATABASE; + } +} diff --git a/paimon-spark/paimon-spark-3.2/pom.xml b/paimon-spark/paimon-spark-3.2/pom.xml index 4f60982ff49c..971e0f9ec540 100644 --- a/paimon-spark/paimon-spark-3.2/pom.xml +++ b/paimon-spark/paimon-spark-3.2/pom.xml @@ -168,6 +168,44 @@ under the License. 3.1.0 test + + org.apache.spark + spark-hive_2.12 + ${spark.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + com.google.protobuf + protobuf-java + + + + + org.apache.paimon + paimon-hive-common + ${project.version} + test + + + org.apache.paimon + paimon-hive-common + ${project.version} + tests + test-jar + test + diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java new file mode 100644 index 000000000000..38a4b4fccf3e --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.utils; + +import org.apache.paimon.catalog.Catalog; +import org.apache.spark.sql.internal.SQLConf; + +/** SQLConf utils. */ +public class SQLConfUtils { + public static String defaultDatabase(SQLConf sqlConf) { + return Catalog.DEFAULT_DATABASE; + } +} diff --git a/paimon-spark/paimon-spark-3.2/src/test/resources/hive-site.xml b/paimon-spark/paimon-spark-3.2/src/test/resources/hive-site.xml new file mode 100644 index 000000000000..db49836572c0 --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/resources/hive-site.xml @@ -0,0 +1,50 @@ + + + + + hive.metastore.integral.jdo.pushdown + true + + + + hive.metastore.schema.verification + false + + + + hive.metastore.client.capability.check + false + + + + datanucleus.schema.autoCreateTables + true + + + + datanucleus.schema.autoCreateAll + true + + + + hive.metastore.uris + thrift://localhost:9087 + Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. + + \ No newline at end of file diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala new file mode 100644 index 000000000000..2a6d04e4ddee --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DDLWithHiveCatalogTest extends DDLWithHiveCatalogTestBase { + override def supportDefaultDatabaseWithSessionCatalog = false +} diff --git a/paimon-spark/paimon-spark-3.3/pom.xml b/paimon-spark/paimon-spark-3.3/pom.xml index fbf05f4942b7..a1e8986776c8 100644 --- a/paimon-spark/paimon-spark-3.3/pom.xml +++ b/paimon-spark/paimon-spark-3.3/pom.xml @@ -159,6 +159,44 @@ under the License. 3.1.0 test + + org.apache.spark + spark-hive_2.12 + ${spark.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + com.google.protobuf + protobuf-java + + + + + org.apache.paimon + paimon-hive-common + ${project.version} + test + + + org.apache.paimon + paimon-hive-common + ${project.version} + tests + test-jar + test + diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java new file mode 100644 index 000000000000..38a4b4fccf3e --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/utils/SQLConfUtils.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.utils; + +import org.apache.paimon.catalog.Catalog; +import org.apache.spark.sql.internal.SQLConf; + +/** SQLConf utils. */ +public class SQLConfUtils { + public static String defaultDatabase(SQLConf sqlConf) { + return Catalog.DEFAULT_DATABASE; + } +} diff --git a/paimon-spark/paimon-spark-3.3/src/test/resources/hive-site.xml b/paimon-spark/paimon-spark-3.3/src/test/resources/hive-site.xml new file mode 100644 index 000000000000..d975445782bf --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/resources/hive-site.xml @@ -0,0 +1,50 @@ + + + + + hive.metastore.integral.jdo.pushdown + true + + + + hive.metastore.schema.verification + false + + + + hive.metastore.client.capability.check + false + + + + datanucleus.schema.autoCreateTables + true + + + + datanucleus.schema.autoCreateAll + true + + + + hive.metastore.uris + thrift://localhost:9088 + Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. + + \ No newline at end of file diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala new file mode 100644 index 000000000000..2a6d04e4ddee --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DDLWithHiveCatalogTest extends DDLWithHiveCatalogTestBase { + override def supportDefaultDatabaseWithSessionCatalog = false +} diff --git a/paimon-spark/paimon-spark-3.4/pom.xml b/paimon-spark/paimon-spark-3.4/pom.xml index 8fadb8ab0a42..4a97f873c8a8 100644 --- a/paimon-spark/paimon-spark-3.4/pom.xml +++ b/paimon-spark/paimon-spark-3.4/pom.xml @@ -159,6 +159,44 @@ under the License. 3.1.0 test + + org.apache.spark + spark-hive_2.12 + ${spark.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + com.google.protobuf + protobuf-java + + + + + org.apache.paimon + paimon-hive-common + ${project.version} + test + + + org.apache.paimon + paimon-hive-common + ${project.version} + tests + test-jar + test + diff --git a/paimon-spark/paimon-spark-3.4/src/test/resources/hive-site.xml b/paimon-spark/paimon-spark-3.4/src/test/resources/hive-site.xml new file mode 100644 index 000000000000..5d5113925ce4 --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/resources/hive-site.xml @@ -0,0 +1,50 @@ + + + + + hive.metastore.integral.jdo.pushdown + true + + + + hive.metastore.schema.verification + false + + + + hive.metastore.client.capability.check + false + + + + datanucleus.schema.autoCreateTables + true + + + + datanucleus.schema.autoCreateAll + true + + + + hive.metastore.uris + thrift://localhost:9089 + Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. + + \ No newline at end of file diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala new file mode 100644 index 000000000000..a9ea3efc89ba --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DDLWithHiveCatalogTest extends DDLWithHiveCatalogTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/pom.xml b/paimon-spark/paimon-spark-3.5/pom.xml index 270a18089ae3..67fdd11a5edb 100644 --- a/paimon-spark/paimon-spark-3.5/pom.xml +++ b/paimon-spark/paimon-spark-3.5/pom.xml @@ -159,6 +159,44 @@ under the License. 3.1.0 test + + org.apache.spark + spark-hive_2.12 + ${spark.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.orc + orc-core + + + com.google.protobuf + protobuf-java + + + + + org.apache.paimon + paimon-hive-common + ${project.version} + test + + + org.apache.paimon + paimon-hive-common + ${project.version} + tests + test-jar + test + diff --git a/paimon-spark/paimon-spark-3.5/src/test/resources/hive-site.xml b/paimon-spark/paimon-spark-3.5/src/test/resources/hive-site.xml new file mode 100644 index 000000000000..b82e118692b2 --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/resources/hive-site.xml @@ -0,0 +1,50 @@ + + + + + hive.metastore.integral.jdo.pushdown + true + + + + hive.metastore.schema.verification + false + + + + hive.metastore.client.capability.check + false + + + + datanucleus.schema.autoCreateTables + true + + + + datanucleus.schema.autoCreateAll + true + + + + hive.metastore.uris + thrift://localhost:9090 + Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. + + \ No newline at end of file diff --git a/paimon-spark/paimon-spark-3.5/src/test/resources/log4j2-test.properties b/paimon-spark/paimon-spark-3.5/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000000..6f324f5863ac --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/resources/log4j2-test.properties @@ -0,0 +1,38 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n + +logger.kafka.name = kafka +logger.kafka.level = OFF +logger.kafka2.name = state.change +logger.kafka2.level = OFF + +logger.zookeeper.name = org.apache.zookeeper +logger.zookeeper.level = OFF +logger.I0Itec.name = org.I0Itec +logger.I0Itec.level = OFF diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala new file mode 100644 index 000000000000..a9ea3efc89ba --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class DDLWithHiveCatalogTest extends DDLWithHiveCatalogTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 7e77d55a99ab..8f98ea91e132 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -54,6 +54,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE; import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType; /** Spark {@link TableCatalog} for paimon. */ @@ -65,6 +66,8 @@ public class SparkCatalog extends SparkBaseCatalog { protected Catalog catalog = null; + private String defaultDatabase; + @Override public void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; @@ -73,6 +76,8 @@ public void initialize(String name, CaseInsensitiveStringMap options) { Options.fromMap(options), SparkSession.active().sessionState().newHadoopConf()); this.catalog = CatalogFactory.createCatalog(catalogContext); + this.defaultDatabase = + options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue()); if (!catalog.databaseExists(defaultNamespace()[0])) { try { createNamespace(defaultNamespace(), new HashMap<>()); @@ -88,7 +93,7 @@ public Catalog paimonCatalog() { @Override public String[] defaultNamespace() { - return new String[] {Catalog.DEFAULT_DATABASE}; + return new String[] {defaultDatabase}; } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java new file mode 100644 index 000000000000..d990d4c864b5 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.options.ConfigOption; + +import static org.apache.paimon.options.ConfigOptions.key; + +/** Options for spark catalog. */ +public class SparkCatalogOptions { + public static final ConfigOption CREATE_UNDERLYING_SESSION_CATALOG = + key("catalog.create-underlying-session-catalog") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog."); + + public static final ConfigOption DEFAULT_DATABASE = + key("defaultDatabase") + .stringType() + .defaultValue(Catalog.DEFAULT_DATABASE) + .withDescription("The default database name."); +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java index e553fb50747d..4ddbc6490f42 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java @@ -25,12 +25,6 @@ /** Options for spark connector. */ public class SparkConnectorOptions { - public static final ConfigOption CREATE_UNDERLYING_SESSION_CATALOG = - key("catalog.create-underlying-session-catalog") - .booleanType() - .defaultValue(false) - .withDescription( - "If true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog."); public static final ConfigOption MERGE_SCHEMA = key("write.merge-schema") .booleanType() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 62674bf54dbd..5b947302c229 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -22,6 +22,7 @@ import org.apache.paimon.hive.HiveCatalogOptions; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.spark.catalog.SparkBaseCatalog; +import org.apache.paimon.spark.utils.SQLConfUtils; import org.apache.paimon.utils.Preconditions; import org.apache.hadoop.conf.Configuration; @@ -63,6 +64,8 @@ import static org.apache.paimon.options.CatalogOptions.METASTORE; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; +import static org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG; +import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE; /* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -77,8 +80,6 @@ public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExte private static final Logger LOG = LoggerFactory.getLogger(SparkGenericCatalog.class); - private static final String[] DEFAULT_NAMESPACE = new String[] {"default"}; - private SparkCatalog sparkCatalog = null; private boolean underlyingSessionCatalogEnabled = false; @@ -92,7 +93,7 @@ public Catalog paimonCatalog() { @Override public String[] defaultNamespace() { - return DEFAULT_NAMESPACE; + return asNamespaceCatalog().defaultNamespace(); } @Override @@ -253,7 +254,8 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { sparkCatalog.initialize(name, newOptions); if (options.getBoolean( - SparkConnectorOptions.CREATE_UNDERLYING_SESSION_CATALOG.key(), false)) { + CREATE_UNDERLYING_SESSION_CATALOG.key(), + CREATE_UNDERLYING_SESSION_CATALOG.defaultValue())) { this.underlyingSessionCatalogEnabled = true; for (Map.Entry entry : options.entrySet()) { sparkConf.set("spark.hadoop." + entry.getKey(), entry.getValue()); @@ -295,10 +297,22 @@ private void fillCommonConfigurations(Map options, SQLConf sqlCo options.put(METASTORE.key(), metastore); } } + String sessionCatalogDefaultDatabase = SQLConfUtils.defaultDatabase(sqlConf); + if (options.containsKey(DEFAULT_DATABASE.key())) { + String userDefineDefaultDatabase = options.get(DEFAULT_DATABASE.key()); + if (!userDefineDefaultDatabase.equals(sessionCatalogDefaultDatabase)) { + LOG.warn( + String.format( + "The current spark version does not support configuring default database, switch database to %s", + sessionCatalogDefaultDatabase)); + options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase); + } + } else { + options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase); + } } @Override - @SuppressWarnings("unchecked") public void setDelegateCatalog(CatalogPlugin delegate) { if (!underlyingSessionCatalogEnabled) { this.sessionCatalog = delegate; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/SQLConfUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/SQLConfUtils.java new file mode 100644 index 000000000000..c7055ab7a437 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/SQLConfUtils.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.utils; + +import org.apache.spark.sql.internal.SQLConf; + +/** SQLConf utils. */ +public class SQLConfUtils { + public static String defaultDatabase(SQLConf sqlConf) { + return sqlConf.defaultDatabase(); + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index 3b07573efdcf..a87502b2242b 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark import org.apache.paimon.hive.TestHiveMetastore +import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.paimon.Utils @@ -27,6 +28,8 @@ import java.io.File class PaimonHiveTestBase extends PaimonSparkTestBase { + import PaimonHiveTestBase._ + protected lazy val tempHiveDBDir: File = Utils.createTempDir protected lazy val testHiveMetastore: TestHiveMetastore = new TestHiveMetastore @@ -47,10 +50,11 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { .set(s"spark.sql.catalog.$paimonHiveCatalogName", classOf[SparkCatalog].getName) .set(s"spark.sql.catalog.$paimonHiveCatalogName.metastore", "hive") .set(s"spark.sql.catalog.$paimonHiveCatalogName.warehouse", tempHiveDBDir.getCanonicalPath) + .set(s"spark.sql.catalog.$paimonHiveCatalogName.uri", hiveUri) } override protected def beforeAll(): Unit = { - testHiveMetastore.start() + testHiveMetastore.start(hivePort) super.beforeAll() spark.sql(s"USE spark_catalog") spark.sql(s"CREATE DATABASE IF NOT EXISTS $hiveDbName") @@ -73,3 +77,14 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { spark.sql(s"USE $hiveDbName") } } + +object PaimonHiveTestBase { + + val hiveUri: String = { + val hadoopConf = new Configuration() + hadoopConf.addResource(getClass.getClassLoader.getResourceAsStream("hive-site.xml")) + hadoopConf.get("hive.metastore.uris") + } + + val hivePort: Int = hiveUri.split(":")(2).toInt +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala similarity index 73% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala rename to paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 4c33b7cf94f9..a4ef0a55dc08 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -20,9 +20,10 @@ package org.apache.paimon.spark.sql import org.apache.paimon.spark.PaimonHiveTestBase +import org.apache.spark.sql.{Row, SparkSession} import org.junit.jupiter.api.Assertions -class DDLWithHiveCatalogTest extends PaimonHiveTestBase { +abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { test("Paimon DDL with hive catalog: create database with location and comment") { Seq("spark_catalog", paimonHiveCatalogName).foreach { @@ -65,6 +66,44 @@ class DDLWithHiveCatalogTest extends PaimonHiveTestBase { } } + test("Paimon DDL with hive catalog: set default database") { + var reusedSpark = spark + + Seq("paimon", "spark_catalog", paimonHiveCatalogName).foreach { + catalogName => + { + val dbName = s"${catalogName}_default_db" + val tblName = s"${dbName}_tbl" + + reusedSpark.sql(s"use $catalogName") + reusedSpark.sql(s"create database $dbName") + reusedSpark.sql(s"use $dbName") + reusedSpark.sql(s"create table $tblName (id int, name string, dt string) using paimon") + reusedSpark.stop() + + reusedSpark = SparkSession + .builder() + .master("local[2]") + .config(sparkConf) + .config("spark.sql.defaultCatalog", catalogName) + .config(s"spark.sql.catalog.$catalogName.defaultDatabase", dbName) + .getOrCreate() + + if (catalogName.equals("spark_catalog") && !supportDefaultDatabaseWithSessionCatalog) { + checkAnswer(reusedSpark.sql("show tables").select("tableName"), Nil) + reusedSpark.sql(s"use $dbName") + } + checkAnswer(reusedSpark.sql("show tables").select("tableName"), Row(tblName) :: Nil) + + reusedSpark.sql(s"drop table $tblName") + } + } + + reusedSpark.stop() + } + + def supportDefaultDatabaseWithSessionCatalog = true + def getDatabaseLocation(dbName: String): String = { spark .sql(s"DESC DATABASE $dbName")