Skip to content

Commit

Permalink
[spark] Support spark to create external table sql statements. (#4576)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Nov 25, 2024
1 parent 8688206 commit b20907e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public class HiveCatalog extends AbstractCatalog {
"org.apache.paimon.hive.PaimonStorageHandler";
private static final String HIVE_PREFIX = "hive.";
public static final String HIVE_SITE_FILE = "hive-site.xml";
private static final String HIVE_EXTERNAL_TABLE_PROP = "EXTERNAL";

private final HiveConf hiveConf;
private final String clientClassName;
Expand Down Expand Up @@ -218,7 +219,7 @@ private Pair<Path, Boolean> initialTableLocation(
externalTable = true;
location = new Path(tableOptions.get(CoreOptions.PATH.key()));
} else {
externalTable = usingExternalTable();
externalTable = usingExternalTable(tableOptions);
location = getTableLocation(identifier, null);
}
return Pair.of(location, externalTable);
Expand Down Expand Up @@ -659,12 +660,18 @@ public void createFormatTable(Identifier identifier, Schema schema) {
}
}

private boolean usingExternalTable() {
private boolean usingExternalTable(Map<String, String> tableOptions) {
CatalogTableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()),
CatalogTableType.class);
return CatalogTableType.EXTERNAL.equals(tableType);

String externalPropValue =
tableOptions.getOrDefault(
HIVE_EXTERNAL_TABLE_PROP.toLowerCase(),
tableOptions.get(HIVE_EXTERNAL_TABLE_PROP.toUpperCase()));
return CatalogTableType.EXTERNAL.equals(tableType)
|| "TRUE".equalsIgnoreCase(externalPropValue);
}

@Override
Expand Down Expand Up @@ -962,7 +969,10 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
if (newTable == null) {
newTable =
createHiveTable(
identifier, tableSchema, location, usingExternalTable());
identifier,
tableSchema,
location,
usingExternalTable(tableSchema.options()));
}
Table finalNewTable = newTable;
clients.execute(client -> client.createTable(finalNewTable));
Expand Down Expand Up @@ -1081,7 +1091,7 @@ private Table newHmsTable(
table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString());
}
if (externalTable) {
table.getParameters().put("EXTERNAL", "TRUE");
table.getParameters().put(HIVE_EXTERNAL_TABLE_PROP, "TRUE");
}
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.paimon.spark;

import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.paimon.table.FileStoreTableFactory;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
Expand All @@ -32,6 +34,7 @@
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Base tests for spark read. */
Expand Down Expand Up @@ -136,4 +139,46 @@ public void testSpecifyHiveConfDir(@TempDir java.nio.file.Path tempDir) {

spark.close();
}

@Test
public void testCreateExternalTable(@TempDir java.nio.file.Path tempDir) {
Path warehousePath = new Path("file:" + tempDir.toString());
SparkSession spark =
SparkSession.builder()
.config("spark.sql.warehouse.dir", warehousePath.toString())
// with hive metastore
.config("spark.sql.catalogImplementation", "hive")
.config("hive.metastore.uris", "thrift://localhost:" + PORT)
.config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.metastore", "hive")
.config(
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
"thrift://localhost:" + PORT)
.config(
"spark.sql.catalog.spark_catalog.warehouse",
warehousePath.toString())
.master("local[2]")
.getOrCreate();

spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
spark.sql("USE spark_catalog.test_db");

// create hive external table
spark.sql("CREATE EXTERNAL TABLE t1 (a INT, bb INT, c STRING)");

// drop hive external table
spark.sql("DROP TABLE t1");

// file system table exists
assertThatCode(
() ->
FileStoreTableFactory.create(
LocalFileIO.create(),
new Path(
warehousePath,
String.format("%s.db/%s", "test_db", "t1"))))
.doesNotThrowAnyException();

spark.close();
}
}

0 comments on commit b20907e

Please sign in to comment.