From 762afcc99989e17f7a0869f4400b06fefcd70c11 Mon Sep 17 00:00:00 2001 From: Yann Date: Thu, 7 Mar 2024 13:44:30 +0800 Subject: [PATCH] [spark] use the internal session catalog in spark generic catalog --- .../org/apache/paimon/spark/SparkCatalog.java | 8 +- .../paimon/spark/SparkConnectorOptions.java | 7 ++ .../paimon/spark/SparkGenericCatalog.java | 101 ++++++++++-------- .../spark/catalog/SparkBaseCatalog.java | 8 ++ .../sql/connector/catalog/CatalogUtils.scala | 58 ++++++++++ .../paimon/spark/PaimonHiveTestBase.scala | 2 +- 6 files changed, 129 insertions(+), 55 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogUtils.scala diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 359f04ff28be..7e77d55a99ab 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -63,12 +63,11 @@ public class SparkCatalog extends SparkBaseCatalog { private static final String PRIMARY_KEY_IDENTIFIER = "primary-key"; - private String name = null; protected Catalog catalog = null; @Override public void initialize(String name, CaseInsensitiveStringMap options) { - this.name = name; + this.catalogName = name; CatalogContext catalogContext = CatalogContext.create( Options.fromMap(options), @@ -87,11 +86,6 @@ public Catalog paimonCatalog() { return catalog; } - @Override - public String name() { - return name; - } - @Override public String[] defaultNamespace() { return new String[] {Catalog.DEFAULT_DATABASE}; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java index d2be72706180..d72f628cbdde 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java @@ -24,6 +24,13 @@ /** Options for spark connector. */ public class SparkConnectorOptions { + + public static final ConfigOption INNER_SESSION_CATALOG = + key("catalog.inner-session-catalog") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, create and use an inner session catalog instead of default session catalog when use SparkGenericCatalog."); public static final ConfigOption MERGE_SCHEMA = key("write.merge-schema") .booleanType() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 7fe9d7a6838a..4597a7452103 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -25,6 +25,7 @@ import org.apache.paimon.utils.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; @@ -32,9 +33,12 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog; import org.apache.spark.sql.catalyst.catalog.InMemoryCatalog; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; import org.apache.spark.sql.connector.catalog.CatalogExtension; import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.CatalogUtils; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; @@ -43,7 +47,9 @@ import org.apache.spark.sql.connector.catalog.TableChange; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog; import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.internal.SessionState; import org.apache.spark.sql.internal.StaticSQLConf; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -56,7 +62,6 @@ import static org.apache.paimon.options.CatalogOptions.METASTORE; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; -import static org.apache.paimon.utils.Preconditions.checkNotNull; /* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -66,19 +71,18 @@ * A Spark catalog that can also load non-Paimon tables. * *

Most of the content of this class is referenced from Iceberg's SparkSessionCatalog. - * - * @param CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces. */ -public class SparkGenericCatalog - extends SparkBaseCatalog implements CatalogExtension { +public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExtension { private static final Logger LOG = LoggerFactory.getLogger(SparkGenericCatalog.class); private static final String[] DEFAULT_NAMESPACE = new String[] {"default"}; - private String catalogName = null; private SparkCatalog sparkCatalog = null; - private T sessionCatalog = null; + + private boolean enableInnerSessionCatalog = false; + + private CatalogPlugin sessionCatalog = null; @Override public Catalog paimonCatalog() { @@ -92,47 +96,47 @@ public String[] defaultNamespace() { @Override public String[][] listNamespaces() throws NoSuchNamespaceException { - return getSessionCatalog().listNamespaces(); + return asNamespaceCatalog().listNamespaces(); } @Override public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException { - return getSessionCatalog().listNamespaces(namespace); + return asNamespaceCatalog().listNamespaces(namespace); } @Override public boolean namespaceExists(String[] namespace) { - return getSessionCatalog().namespaceExists(namespace); + return asNamespaceCatalog().namespaceExists(namespace); } @Override public Map loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException { - return getSessionCatalog().loadNamespaceMetadata(namespace); + return asNamespaceCatalog().loadNamespaceMetadata(namespace); } @Override public void createNamespace(String[] namespace, Map metadata) throws NamespaceAlreadyExistsException { - getSessionCatalog().createNamespace(namespace, metadata); + asNamespaceCatalog().createNamespace(namespace, metadata); } @Override public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException { - getSessionCatalog().alterNamespace(namespace, changes); + asNamespaceCatalog().alterNamespace(namespace, changes); } @Override public boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException { - return getSessionCatalog().dropNamespace(namespace, cascade); + return asNamespaceCatalog().dropNamespace(namespace, cascade); } @Override public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { // delegate to the session catalog because all tables share the same namespace - return getSessionCatalog().listTables(namespace); + return asTableCatalog().listTables(namespace); } @Override @@ -140,7 +144,7 @@ public Table loadTable(Identifier ident) throws NoSuchTableException { try { return sparkCatalog.loadTable(ident); } catch (NoSuchTableException e) { - return throwsOldIfExceptionHappens(() -> getSessionCatalog().loadTable(ident), e); + return throwsOldIfExceptionHappens(() -> asTableCatalog().loadTable(ident), e); } } @@ -149,8 +153,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep try { return sparkCatalog.loadTable(ident, version); } catch (NoSuchTableException e) { - return throwsOldIfExceptionHappens( - () -> getSessionCatalog().loadTable(ident, version), e); + return throwsOldIfExceptionHappens(() -> asTableCatalog().loadTable(ident, version), e); } } @@ -160,7 +163,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep return sparkCatalog.loadTable(ident, timestamp); } catch (NoSuchTableException e) { return throwsOldIfExceptionHappens( - () -> getSessionCatalog().loadTable(ident, timestamp), e); + () -> asTableCatalog().loadTable(ident, timestamp), e); } } @@ -169,7 +172,7 @@ public void invalidateTable(Identifier ident) { // We do not need to check whether the table exists and whether // it is an Paimon table to reduce remote service requests. sparkCatalog.invalidateTable(ident); - getSessionCatalog().invalidateTable(ident); + asTableCatalog().invalidateTable(ident); } @Override @@ -184,7 +187,7 @@ public Table createTable( return sparkCatalog.createTable(ident, schema, partitions, properties); } else { // delegate to the session catalog - return getSessionCatalog().createTable(ident, schema, partitions, properties); + return asTableCatalog().createTable(ident, schema, partitions, properties); } } @@ -193,18 +196,18 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT if (sparkCatalog.tableExists(ident)) { return sparkCatalog.alterTable(ident, changes); } else { - return getSessionCatalog().alterTable(ident, changes); + return asTableCatalog().alterTable(ident, changes); } } @Override public boolean dropTable(Identifier ident) { - return sparkCatalog.dropTable(ident) || getSessionCatalog().dropTable(ident); + return sparkCatalog.dropTable(ident) || asTableCatalog().dropTable(ident); } @Override public boolean purgeTable(Identifier ident) { - return sparkCatalog.purgeTable(ident) || getSessionCatalog().purgeTable(ident); + return sparkCatalog.purgeTable(ident) || asTableCatalog().purgeTable(ident); } @Override @@ -213,13 +216,15 @@ public void renameTable(Identifier from, Identifier to) if (sparkCatalog.tableExists(from)) { sparkCatalog.renameTable(from, to); } else { - getSessionCatalog().renameTable(from, to); + asTableCatalog().renameTable(from, to); } } @Override public final void initialize(String name, CaseInsensitiveStringMap options) { - Configuration hadoopConf = SparkSession.active().sessionState().newHadoopConf(); + SessionState sessionState = SparkSession.active().sessionState(); + Configuration hadoopConf = sessionState.newHadoopConf(); + SparkConf sparkConf = new SparkConf(); if (options.containsKey(METASTORE.key()) && options.get(METASTORE.key()).equalsIgnoreCase("hive")) { String uri = options.get(CatalogOptions.URI.key()); @@ -242,10 +247,20 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; this.sparkCatalog = new SparkCatalog(); - this.sparkCatalog.initialize( - name, - autoFillConfigurations( - options, SparkSession.active().sessionState().conf(), hadoopConf)); + CaseInsensitiveStringMap newOptions = + autoFillConfigurations(options, sessionState.conf(), hadoopConf); + sparkCatalog.initialize(name, newOptions); + + if (options.getBoolean(SparkConnectorOptions.INNER_SESSION_CATALOG.key(), false)) { + this.enableInnerSessionCatalog = true; + for (Map.Entry entry : options.entrySet()) { + sparkConf.set("spark.hadoop." + entry.getKey(), entry.getValue()); + hadoopConf.set(entry.getKey(), entry.getValue()); + } + ExternalCatalog externalCatalog = + CatalogUtils.buildExternalCatalog(sparkConf, hadoopConf); + this.sessionCatalog = new V2SessionCatalog(new SessionCatalog(externalCatalog)); + } } private CaseInsensitiveStringMap autoFillConfigurations( @@ -282,30 +297,22 @@ private void fillCommonConfigurations(Map options, SQLConf sqlCo @Override @SuppressWarnings("unchecked") - public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) { - if (sparkSessionCatalog instanceof TableCatalog - && sparkSessionCatalog instanceof SupportsNamespaces) { - this.sessionCatalog = (T) sparkSessionCatalog; - } else { - throw new IllegalArgumentException("Invalid session catalog: " + sparkSessionCatalog); + public void setDelegateCatalog(CatalogPlugin delegate) { + if (!enableInnerSessionCatalog) { + this.sessionCatalog = delegate; } } - @Override - public String name() { - return catalogName; - } - private boolean usePaimon(String provider) { return provider == null || SparkSource.NAME().equalsIgnoreCase(provider); } - private T getSessionCatalog() { - checkNotNull( - sessionCatalog, - "Delegated SessionCatalog is missing. " - + "Please make sure your are replacing Spark's default catalog, named 'spark_catalog'."); - return sessionCatalog; + private TableCatalog asTableCatalog() { + return (TableCatalog) sessionCatalog; + } + + private SupportsNamespaces asNamespaceCatalog() { + return (SupportsNamespaces) sessionCatalog; } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java index 3a18277de0e4..2f5267029560 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java @@ -31,6 +31,14 @@ /** Spark base catalog. */ public abstract class SparkBaseCatalog implements TableCatalog, SupportsNamespaces, ProcedureCatalog, WithPaimonCatalog { + + protected String catalogName; + + @Override + public String name() { + return catalogName; + } + @Override public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException { if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogUtils.scala new file mode 100644 index 000000000000..283fbf25427b --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogUtils.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.util.Utils + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +object CatalogUtils { + + def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration): ExternalCatalog = { + val externalCatalogClassName = + if (SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) { + "org.apache.spark.sql.hive.HiveExternalCatalog" + } else { + "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog" + } + reflect[ExternalCatalog, SparkConf, Configuration](externalCatalogClassName, conf, hadoopConf) + } + + private def reflect[T, Arg1 <: AnyRef, Arg2 <: AnyRef]( + className: String, + ctorArg1: Arg1, + ctorArg2: Arg2)(implicit ctorArgTag1: ClassTag[Arg1], ctorArgTag2: ClassTag[Arg2]): T = { + try { + val clazz = Utils.classForName(className) + val ctor = clazz.getDeclaredConstructor(ctorArgTag1.runtimeClass, ctorArgTag2.runtimeClass) + val args = Array[AnyRef](ctorArg1, ctorArg2) + ctor.newInstance(args: _*).asInstanceOf[T] + } catch { + case NonFatal(e) => + throw new IllegalArgumentException(s"Error while instantiating '$className':", e) + } + } + +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index abc1ecd81210..3b07573efdcf 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -43,7 +43,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { super.sparkConf .set("spark.sql.warehouse.dir", tempHiveDBDir.getCanonicalPath) .set("spark.sql.catalogImplementation", "hive") - .set("spark.sql.catalog.spark_catalog", classOf[SparkGenericCatalog[_]].getName) + .set("spark.sql.catalog.spark_catalog", "org.apache.paimon.spark.SparkGenericCatalog") .set(s"spark.sql.catalog.$paimonHiveCatalogName", classOf[SparkCatalog].getName) .set(s"spark.sql.catalog.$paimonHiveCatalogName.metastore", "hive") .set(s"spark.sql.catalog.$paimonHiveCatalogName.warehouse", tempHiveDBDir.getCanonicalPath)