rows = spark.sql("SELECT * FROM CT").collectAsList();
+ assertThat(rows.stream().map(Object::toString))
+ .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
+ }
+
+ private static void writeTable(String tableName, GenericRow... rows) throws Exception {
+ FileStoreTable fileStoreTable =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(warehousePath, String.format("default.db/%s", tableName)));
+ BatchWriteBuilder writeBuilder = fileStoreTable.newBatchWriteBuilder();
+ BatchTableWrite writer = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ for (GenericRow row : rows) {
+ writer.write(row);
+ }
+ commit.commit(writer.prepareCommit());
+ writer.close();
+ }
+}
diff --git a/paimon-spark/paimon-spark-3.1/src/test/resources/log4j2-test.properties b/paimon-spark/paimon-spark-3.1/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000000..1b3980d15104
--- /dev/null
+++ b/paimon-spark/paimon-spark-3.1/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
index 66138b7269b2..dbd4a937c4f5 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
@@ -79,6 +79,10 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
Options.fromMap(options),
SparkSession.active().sessionState().newHadoopConf());
this.catalog = CatalogFactory.createCatalog(catalogContext);
+ try {
+ createNamespace(defaultNamespace(), new HashMap<>());
+ } catch (NamespaceAlreadyExistsException ignored) {
+ }
}
@Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
new file mode 100644
index 000000000000..1f819dd034e0
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.CatalogExtension;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.options.CatalogOptions.METASTORE;
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * A Spark catalog that can also load non-Paimon tables.
+ *
+ * Most of the content of this class is referenced from Iceberg's SparkSessionCatalog.
+ *
+ * @param CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces.
+ */
+public class SparkGenericCatalog
+ implements CatalogExtension {
+
+ private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
+
+ private String catalogName = null;
+ private SparkCatalog paimonCatalog = null;
+ private T sessionCatalog = null;
+
+ @Override
+ public String[] defaultNamespace() {
+ return DEFAULT_NAMESPACE;
+ }
+
+ @Override
+ public String[][] listNamespaces() throws NoSuchNamespaceException {
+ return getSessionCatalog().listNamespaces();
+ }
+
+ @Override
+ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
+ return getSessionCatalog().listNamespaces(namespace);
+ }
+
+ @Override
+ public boolean namespaceExists(String[] namespace) {
+ return getSessionCatalog().namespaceExists(namespace);
+ }
+
+ @Override
+ public Map loadNamespaceMetadata(String[] namespace)
+ throws NoSuchNamespaceException {
+ return getSessionCatalog().loadNamespaceMetadata(namespace);
+ }
+
+ @Override
+ public void createNamespace(String[] namespace, Map metadata)
+ throws NamespaceAlreadyExistsException {
+ getSessionCatalog().createNamespace(namespace, metadata);
+ }
+
+ @Override
+ public void alterNamespace(String[] namespace, NamespaceChange... changes)
+ throws NoSuchNamespaceException {
+ getSessionCatalog().alterNamespace(namespace, changes);
+ }
+
+ @Override
+ public boolean dropNamespace(String[] namespace, boolean cascade)
+ throws NoSuchNamespaceException, NonEmptyNamespaceException {
+ return getSessionCatalog().dropNamespace(namespace, cascade);
+ }
+
+ @Override
+ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
+ // delegate to the session catalog because all tables share the same namespace
+ return getSessionCatalog().listTables(namespace);
+ }
+
+ @Override
+ public Table loadTable(Identifier ident) throws NoSuchTableException {
+ try {
+ return paimonCatalog.loadTable(ident);
+ } catch (NoSuchTableException e) {
+ return getSessionCatalog().loadTable(ident);
+ }
+ }
+
+ @Override
+ public Table loadTable(Identifier ident, String version) throws NoSuchTableException {
+ try {
+ return paimonCatalog.loadTable(ident, version);
+ } catch (NoSuchTableException e) {
+ return getSessionCatalog().loadTable(ident, version);
+ }
+ }
+
+ @Override
+ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
+ try {
+ return paimonCatalog.loadTable(ident, timestamp);
+ } catch (NoSuchTableException e) {
+ return getSessionCatalog().loadTable(ident, timestamp);
+ }
+ }
+
+ @Override
+ public void invalidateTable(Identifier ident) {
+ // We do not need to check whether the table exists and whether
+ // it is an Paimon table to reduce remote service requests.
+ paimonCatalog.invalidateTable(ident);
+ getSessionCatalog().invalidateTable(ident);
+ }
+
+ @Override
+ public Table createTable(
+ Identifier ident,
+ StructType schema,
+ Transform[] partitions,
+ Map properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ String provider = properties.get("provider");
+ if (usePaimon(provider)) {
+ return paimonCatalog.createTable(ident, schema, partitions, properties);
+ } else {
+ // delegate to the session catalog
+ return getSessionCatalog().createTable(ident, schema, partitions, properties);
+ }
+ }
+
+ @Override
+ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+ if (paimonCatalog.tableExists(ident)) {
+ return paimonCatalog.alterTable(ident, changes);
+ } else {
+ return getSessionCatalog().alterTable(ident, changes);
+ }
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ return paimonCatalog.dropTable(ident) || getSessionCatalog().dropTable(ident);
+ }
+
+ @Override
+ public boolean purgeTable(Identifier ident) {
+ return paimonCatalog.purgeTable(ident) || getSessionCatalog().purgeTable(ident);
+ }
+
+ @Override
+ public void renameTable(Identifier from, Identifier to)
+ throws NoSuchTableException, TableAlreadyExistsException {
+ if (paimonCatalog.tableExists(from)) {
+ paimonCatalog.renameTable(from, to);
+ } else {
+ getSessionCatalog().renameTable(from, to);
+ }
+ }
+
+ @Override
+ public final void initialize(String name, CaseInsensitiveStringMap options) {
+ if (options.containsKey(METASTORE.key())
+ && options.get(METASTORE.key()).equalsIgnoreCase("hive")) {
+ String uri = options.get(CatalogOptions.URI.key());
+ if (uri != null) {
+ Configuration conf = SparkSession.active().sessionState().newHadoopConf();
+ String envHmsUri = conf.get("hive.metastore.uris", null);
+ if (envHmsUri != null) {
+ Preconditions.checkArgument(
+ uri.equals(envHmsUri),
+ "Inconsistent Hive metastore URIs: %s (Spark session) != %s (spark_catalog)",
+ envHmsUri,
+ uri);
+ }
+ }
+ }
+
+ this.catalogName = name;
+ this.paimonCatalog = new SparkCatalog();
+
+ if (!options.containsKey(WAREHOUSE.key())) {
+ Map newOptions = new HashMap<>(options.asCaseSensitiveMap());
+ String warehouse = SparkSession.active().sessionState().conf().warehousePath();
+ newOptions.put(WAREHOUSE.key(), warehouse);
+ options = new CaseInsensitiveStringMap(newOptions);
+ }
+ this.paimonCatalog.initialize(name, options);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
+ if (sparkSessionCatalog instanceof TableCatalog
+ && sparkSessionCatalog instanceof SupportsNamespaces) {
+ this.sessionCatalog = (T) sparkSessionCatalog;
+ } else {
+ throw new IllegalArgumentException("Invalid session catalog: " + sparkSessionCatalog);
+ }
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ private boolean usePaimon(String provider) {
+ return provider == null || "paimon".equalsIgnoreCase(provider);
+ }
+
+ private T getSessionCatalog() {
+ checkNotNull(
+ sessionCatalog,
+ "Delegated SessionCatalog is missing. "
+ + "Please make sure your are replacing Spark's default catalog, named 'spark_catalog'.");
+ return sessionCatalog;
+ }
+
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
+ if (namespace.length == 0 || isSystemNamespace(namespace) || namespaceExists(namespace)) {
+ return new Identifier[0];
+ }
+
+ throw new NoSuchNamespaceException(namespace);
+ }
+
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
+ throw new NoSuchFunctionException(ident);
+ }
+
+ private static boolean isSystemNamespace(String[] namespace) {
+ return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
new file mode 100644
index 000000000000..aa14bf72e13f
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.fs.Path;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base tests for spark read. */
+public class SparkGenericCatalogTest {
+
+ protected static SparkSession spark = null;
+
+ protected static Path warehousePath = null;
+
+ @BeforeAll
+ public static void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
+ warehousePath = new Path("file:" + tempDir.toString());
+ spark =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir", warehousePath.toString())
+ .master("local[2]")
+ .getOrCreate();
+ spark.conf().set("spark.sql.catalog.spark_catalog", SparkGenericCatalog.class.getName());
+ }
+
+ @AfterAll
+ public static void stopMetastoreAndSpark() {
+ if (spark != null) {
+ spark.stop();
+ spark = null;
+ }
+ }
+
+ @Test
+ public void testPaimonTable() {
+ spark.sql(
+ "CREATE TABLE PT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ + " ('file.format'='avro')");
+ testReadWrite("PT");
+
+ spark.sql("CREATE DATABASE my_db");
+ spark.sql(
+ "CREATE TABLE DB_PT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ + " ('file.format'='avro')");
+ testReadWrite("DB_PT");
+
+ assertThat(spark.sql("SHOW NAMESPACES").collectAsList().stream().map(Object::toString))
+ .containsExactlyInAnyOrder("[default]", "[my_db]");
+ }
+
+ @Test
+ public void testCsvTable() {
+ spark.sql("CREATE TABLE CT (a INT, b INT, c STRING) USING csv");
+ testReadWrite("CT");
+ }
+
+ private void testReadWrite(String table) {
+ spark.sql("INSERT INTO " + table + " VALUES (1, 2, '3'), (4, 5, '6')").collectAsList();
+ List rows = spark.sql("SELECT * FROM " + table).collectAsList();
+ assertThat(rows.stream().map(Object::toString))
+ .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 191b09d5e82e..50985c9c9bf3 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -50,7 +50,7 @@
/** Base tests for spark read. */
public abstract class SparkReadTestBase {
- private static final String COMMIT_USER = "user";
+
private static final AtomicLong COMMIT_IDENTIFIER = new AtomicLong(0);
protected static SparkSession spark = null;
@@ -68,7 +68,6 @@ public static void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
spark.conf().set("spark.sql.catalog.paimon", SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.paimon.warehouse", warehousePath.toString());
spark.sql("USE paimon");
- spark.sql("CREATE NAMESPACE default");
}
@AfterAll