From b20907ed8b9a9542901bb34a880b3ff864931e4e Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Mon, 25 Nov 2024 10:30:22 +0800 Subject: [PATCH] [spark] Support spark to create external table sql statements. (#4576) --- .../org/apache/paimon/hive/HiveCatalog.java | 20 ++++++--- .../spark/SparkCatalogWithHiveTest.java | 45 +++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index b92c3b59d925..ebd5a1edf89b 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -135,6 +135,7 @@ public class HiveCatalog extends AbstractCatalog { "org.apache.paimon.hive.PaimonStorageHandler"; private static final String HIVE_PREFIX = "hive."; public static final String HIVE_SITE_FILE = "hive-site.xml"; + private static final String HIVE_EXTERNAL_TABLE_PROP = "EXTERNAL"; private final HiveConf hiveConf; private final String clientClassName; @@ -218,7 +219,7 @@ private Pair initialTableLocation( externalTable = true; location = new Path(tableOptions.get(CoreOptions.PATH.key())); } else { - externalTable = usingExternalTable(); + externalTable = usingExternalTable(tableOptions); location = getTableLocation(identifier, null); } return Pair.of(location, externalTable); @@ -659,12 +660,18 @@ public void createFormatTable(Identifier identifier, Schema schema) { } } - private boolean usingExternalTable() { + private boolean usingExternalTable(Map tableOptions) { CatalogTableType tableType = OptionsUtils.convertToEnum( hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()), CatalogTableType.class); - return CatalogTableType.EXTERNAL.equals(tableType); + + String externalPropValue = + tableOptions.getOrDefault( + HIVE_EXTERNAL_TABLE_PROP.toLowerCase(), + tableOptions.get(HIVE_EXTERNAL_TABLE_PROP.toUpperCase())); + return CatalogTableType.EXTERNAL.equals(tableType) + || "TRUE".equalsIgnoreCase(externalPropValue); } @Override @@ -962,7 +969,10 @@ public void repairTable(Identifier identifier) throws TableNotExistException { if (newTable == null) { newTable = createHiveTable( - identifier, tableSchema, location, usingExternalTable()); + identifier, + tableSchema, + location, + usingExternalTable(tableSchema.options())); } Table finalNewTable = newTable; clients.execute(client -> client.createTable(finalNewTable)); @@ -1081,7 +1091,7 @@ private Table newHmsTable( table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString()); } if (externalTable) { - table.getParameters().put("EXTERNAL", "TRUE"); + table.getParameters().put(HIVE_EXTERNAL_TABLE_PROP, "TRUE"); } return table; } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java index 68cf91b8ec7b..45ccd06479f2 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java @@ -19,7 +19,9 @@ package org.apache.paimon.spark; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.hive.TestHiveMetastore; +import org.apache.paimon.table.FileStoreTableFactory; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -32,6 +34,7 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Base tests for spark read. */ @@ -136,4 +139,46 @@ public void testSpecifyHiveConfDir(@TempDir java.nio.file.Path tempDir) { spark.close(); } + + @Test + public void testCreateExternalTable(@TempDir java.nio.file.Path tempDir) { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession spark = + SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config("hive.metastore.uris", "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.metastore", "hive") + .config( + "spark.sql.catalog.spark_catalog.hive.metastore.uris", + "thrift://localhost:" + PORT) + .config( + "spark.sql.catalog.spark_catalog.warehouse", + warehousePath.toString()) + .master("local[2]") + .getOrCreate(); + + spark.sql("CREATE DATABASE IF NOT EXISTS test_db"); + spark.sql("USE spark_catalog.test_db"); + + // create hive external table + spark.sql("CREATE EXTERNAL TABLE t1 (a INT, bb INT, c STRING)"); + + // drop hive external table + spark.sql("DROP TABLE t1"); + + // file system table exists + assertThatCode( + () -> + FileStoreTableFactory.create( + LocalFileIO.create(), + new Path( + warehousePath, + String.format("%s.db/%s", "test_db", "t1")))) + .doesNotThrowAnyException(); + + spark.close(); + } }