Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] use the internal session catalog in spark generic catalog #2959

Merged
merged 3 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,11 @@
<td>Boolean</td>
<td>If true, allow to merge data types if the two types meet the rules for explicit casting.</td>
</tr>
<tr>
<td><h5>catalog.create-underlying-session-catalog</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,11 @@ public class SparkCatalog extends SparkBaseCatalog {

private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";

private String name = null;
protected Catalog catalog = null;

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.name = name;
this.catalogName = name;
CatalogContext catalogContext =
CatalogContext.create(
Options.fromMap(options),
Expand All @@ -87,11 +86,6 @@ public Catalog paimonCatalog() {
return catalog;
}

@Override
public String name() {
return name;
}

@Override
public String[] defaultNamespace() {
return new String[] {Catalog.DEFAULT_DATABASE};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@

/** Options for spark connector. */
public class SparkConnectorOptions {

public static final ConfigOption<Boolean> CREATE_UNDERLYING_SESSION_CATALOG =
key("catalog.create-underlying-session-catalog")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, create and use an underlying session catalog instead of default session catalog when use SparkGenericCatalog.");
public static final ConfigOption<Boolean> MERGE_SCHEMA =
key("write.merge-schema")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
import org.apache.paimon.utils.Preconditions;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog;
import org.apache.spark.sql.catalyst.catalog.InMemoryCatalog;
import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
import org.apache.spark.sql.connector.catalog.CatalogExtension;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.CatalogUtils;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
Expand All @@ -43,7 +47,9 @@
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.internal.SessionState;
import org.apache.spark.sql.internal.StaticSQLConf;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand All @@ -56,7 +62,6 @@

import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand All @@ -66,19 +71,18 @@
* A Spark catalog that can also load non-Paimon tables.
*
* <p>Most of the content of this class is referenced from Iceberg's SparkSessionCatalog.
*
* @param <T> CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces.
*/
public class SparkGenericCatalog<T extends TableCatalog & SupportsNamespaces>
extends SparkBaseCatalog implements CatalogExtension {
public class SparkGenericCatalog extends SparkBaseCatalog implements CatalogExtension {

private static final Logger LOG = LoggerFactory.getLogger(SparkGenericCatalog.class);

private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};

private String catalogName = null;
private SparkCatalog sparkCatalog = null;
private T sessionCatalog = null;

private boolean underlyingSessionCatalogEnabled = false;

private CatalogPlugin sessionCatalog = null;

@Override
public Catalog paimonCatalog() {
Expand All @@ -92,55 +96,55 @@ public String[] defaultNamespace() {

@Override
public String[][] listNamespaces() throws NoSuchNamespaceException {
return getSessionCatalog().listNamespaces();
return asNamespaceCatalog().listNamespaces();
}

@Override
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
return getSessionCatalog().listNamespaces(namespace);
return asNamespaceCatalog().listNamespaces(namespace);
}

@Override
public boolean namespaceExists(String[] namespace) {
return getSessionCatalog().namespaceExists(namespace);
return asNamespaceCatalog().namespaceExists(namespace);
}

@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
return getSessionCatalog().loadNamespaceMetadata(namespace);
return asNamespaceCatalog().loadNamespaceMetadata(namespace);
}

@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
getSessionCatalog().createNamespace(namespace, metadata);
asNamespaceCatalog().createNamespace(namespace, metadata);
}

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
getSessionCatalog().alterNamespace(namespace, changes);
asNamespaceCatalog().alterNamespace(namespace, changes);
}

@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException, NonEmptyNamespaceException {
return getSessionCatalog().dropNamespace(namespace, cascade);
return asNamespaceCatalog().dropNamespace(namespace, cascade);
}

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
// delegate to the session catalog because all tables share the same namespace
return getSessionCatalog().listTables(namespace);
return asTableCatalog().listTables(namespace);
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
try {
return sparkCatalog.loadTable(ident);
} catch (NoSuchTableException e) {
return throwsOldIfExceptionHappens(() -> getSessionCatalog().loadTable(ident), e);
return throwsOldIfExceptionHappens(() -> asTableCatalog().loadTable(ident), e);
}
}

Expand All @@ -149,8 +153,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep
try {
return sparkCatalog.loadTable(ident, version);
} catch (NoSuchTableException e) {
return throwsOldIfExceptionHappens(
() -> getSessionCatalog().loadTable(ident, version), e);
return throwsOldIfExceptionHappens(() -> asTableCatalog().loadTable(ident, version), e);
}
}

Expand All @@ -160,7 +163,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep
return sparkCatalog.loadTable(ident, timestamp);
} catch (NoSuchTableException e) {
return throwsOldIfExceptionHappens(
() -> getSessionCatalog().loadTable(ident, timestamp), e);
() -> asTableCatalog().loadTable(ident, timestamp), e);
}
}

Expand All @@ -169,7 +172,7 @@ public void invalidateTable(Identifier ident) {
// We do not need to check whether the table exists and whether
// it is an Paimon table to reduce remote service requests.
sparkCatalog.invalidateTable(ident);
getSessionCatalog().invalidateTable(ident);
asTableCatalog().invalidateTable(ident);
}

@Override
Expand All @@ -184,7 +187,7 @@ public Table createTable(
return sparkCatalog.createTable(ident, schema, partitions, properties);
} else {
// delegate to the session catalog
return getSessionCatalog().createTable(ident, schema, partitions, properties);
return asTableCatalog().createTable(ident, schema, partitions, properties);
}
}

Expand All @@ -193,18 +196,18 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
if (sparkCatalog.tableExists(ident)) {
return sparkCatalog.alterTable(ident, changes);
} else {
return getSessionCatalog().alterTable(ident, changes);
return asTableCatalog().alterTable(ident, changes);
}
}

@Override
public boolean dropTable(Identifier ident) {
return sparkCatalog.dropTable(ident) || getSessionCatalog().dropTable(ident);
return sparkCatalog.dropTable(ident) || asTableCatalog().dropTable(ident);
}

@Override
public boolean purgeTable(Identifier ident) {
return sparkCatalog.purgeTable(ident) || getSessionCatalog().purgeTable(ident);
return sparkCatalog.purgeTable(ident) || asTableCatalog().purgeTable(ident);
}

@Override
Expand All @@ -213,13 +216,15 @@ public void renameTable(Identifier from, Identifier to)
if (sparkCatalog.tableExists(from)) {
sparkCatalog.renameTable(from, to);
} else {
getSessionCatalog().renameTable(from, to);
asTableCatalog().renameTable(from, to);
}
}

@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
Configuration hadoopConf = SparkSession.active().sessionState().newHadoopConf();
SessionState sessionState = SparkSession.active().sessionState();
Configuration hadoopConf = sessionState.newHadoopConf();
SparkConf sparkConf = new SparkConf();
if (options.containsKey(METASTORE.key())
&& options.get(METASTORE.key()).equalsIgnoreCase("hive")) {
String uri = options.get(CatalogOptions.URI.key());
Expand All @@ -242,10 +247,21 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
this.sparkCatalog = new SparkCatalog();

this.sparkCatalog.initialize(
name,
autoFillConfigurations(
options, SparkSession.active().sessionState().conf(), hadoopConf));
CaseInsensitiveStringMap newOptions =
autoFillConfigurations(options, sessionState.conf(), hadoopConf);
sparkCatalog.initialize(name, newOptions);

if (options.getBoolean(
SparkConnectorOptions.CREATE_UNDERLYING_SESSION_CATALOG.key(), false)) {
this.underlyingSessionCatalogEnabled = false;
for (Map.Entry<String, String> entry : options.entrySet()) {
sparkConf.set("spark.hadoop." + entry.getKey(), entry.getValue());
hadoopConf.set(entry.getKey(), entry.getValue());
}
ExternalCatalog externalCatalog =
CatalogUtils.buildExternalCatalog(sparkConf, hadoopConf);
this.sessionCatalog = new V2SessionCatalog(new SessionCatalog(externalCatalog));
}
}

private CaseInsensitiveStringMap autoFillConfigurations(
Expand Down Expand Up @@ -282,30 +298,22 @@ private void fillCommonConfigurations(Map<String, String> options, SQLConf sqlCo

@Override
@SuppressWarnings("unchecked")
public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
if (sparkSessionCatalog instanceof TableCatalog
&& sparkSessionCatalog instanceof SupportsNamespaces) {
this.sessionCatalog = (T) sparkSessionCatalog;
} else {
throw new IllegalArgumentException("Invalid session catalog: " + sparkSessionCatalog);
public void setDelegateCatalog(CatalogPlugin delegate) {
if (!underlyingSessionCatalogEnabled) {
this.sessionCatalog = delegate;
}
}

@Override
public String name() {
return catalogName;
}

private boolean usePaimon(String provider) {
return provider == null || SparkSource.NAME().equalsIgnoreCase(provider);
}

private T getSessionCatalog() {
checkNotNull(
sessionCatalog,
"Delegated SessionCatalog is missing. "
+ "Please make sure your are replacing Spark's default catalog, named 'spark_catalog'.");
return sessionCatalog;
private TableCatalog asTableCatalog() {
return (TableCatalog) sessionCatalog;
}

private SupportsNamespaces asNamespaceCatalog() {
return (SupportsNamespaces) sessionCatalog;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
/** Spark base catalog. */
public abstract class SparkBaseCatalog
implements TableCatalog, SupportsNamespaces, ProcedureCatalog, WithPaimonCatalog {

protected String catalogName;

@Override
public String name() {
return catalogName;
}

@Override
public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException {
if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.Utils

import scala.reflect.ClassTag
import scala.util.control.NonFatal

object CatalogUtils {

def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration): ExternalCatalog = {
val externalCatalogClassName =
if (SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) {
"org.apache.spark.sql.hive.HiveExternalCatalog"
} else {
"org.apache.spark.sql.catalyst.catalog.InMemoryCatalog"
}
reflect[ExternalCatalog, SparkConf, Configuration](externalCatalogClassName, conf, hadoopConf)
}

private def reflect[T, Arg1 <: AnyRef, Arg2 <: AnyRef](
className: String,
ctorArg1: Arg1,
ctorArg2: Arg2)(implicit ctorArgTag1: ClassTag[Arg1], ctorArgTag2: ClassTag[Arg2]): T = {
try {
val clazz = Utils.classForName(className)
val ctor = clazz.getDeclaredConstructor(ctorArgTag1.runtimeClass, ctorArgTag2.runtimeClass)
val args = Array[AnyRef](ctorArg1, ctorArg2)
ctor.newInstance(args: _*).asInstanceOf[T]
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}

}
Loading
Loading