From 7c5491bd9322766c8b50c441d3d42e12c2a9314e Mon Sep 17 00:00:00 2001 From: ZhongYujiang <42907416+zhongyujiang@users.noreply.github.com> Date: Sun, 4 Aug 2024 17:38:53 +0800 Subject: [PATCH] Support create table with user defined location. --- .../paimon/catalog/FileSystemCatalog.java | 5 +++ .../org/apache/paimon/jdbc/JdbcCatalog.java | 5 +++ .../org/apache/paimon/hive/HiveCatalog.java | 34 ++++++++++++++---- .../apache/paimon/hive/HiveCatalogTest.java | 36 +++++++++++++++++++ .../org/apache/paimon/spark/SparkCatalog.java | 5 +++ 5 files changed, 79 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index d04b975dd7c03..fb287c808dde9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.Lock; @@ -36,6 +37,7 @@ import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { @@ -117,6 +119,9 @@ protected void dropTableImpl(Identifier identifier) { @Override public void createTableImpl(Identifier identifier, Schema schema) { + checkArgument( + !schema.options().containsKey(CoreOptions.PATH.key()), + "The FileSystemCatalog does not support specifying location when creating a table."); uncheck(() -> schemaManager(identifier).createTable(schema)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index da08309ad69f2..6d409d585f219 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogLockContext; @@ -57,6 +58,7 @@ import static org.apache.paimon.jdbc.JdbcUtils.execute; import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; import static org.apache.paimon.jdbc.JdbcUtils.updateTable; +import static org.apache.paimon.utils.Preconditions.checkArgument; /* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -235,6 +237,9 @@ protected void dropTableImpl(Identifier identifier) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { + checkArgument( + !schema.options().containsKey(CoreOptions.PATH.key()), + "The FileSystemCatalog does not support specifying location when creating a table."); try { // create table file getSchemaManager(identifier).createTable(schema); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 4ed7c54d8a74f..8151d75f3f4dc 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -76,6 +76,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -454,7 +455,18 @@ protected void createTableImpl(Identifier identifier, Schema schema) { // if changes on Hive fails there is no harm to perform the same changes to files again TableSchema tableSchema; try { - tableSchema = schemaManager(identifier).createTable(schema, usingExternalTable()); + Path tableRoot; + if (schema.options().containsKey(CoreOptions.PATH.key())) { + checkArgument( + Objects.equals(createTableType(), TableType.EXTERNAL), + "The HiveCatalog only supports specifying location when creating an external table"); + tableRoot = new Path(schema.options().get(CoreOptions.PATH.key())); + } else { + tableRoot = getDataTableLocation(identifier); + } + + tableSchema = + schemaManager(identifier, tableRoot).createTable(schema, usingExternalTable()); } catch (Exception e) { throw new RuntimeException( "Failed to commit changes of table " @@ -669,10 +681,7 @@ public String warehouse() { private Table newHmsTable(Identifier identifier, Map tableParameters) { long currentTimeMillis = System.currentTimeMillis(); - TableType tableType = - OptionsUtils.convertToEnum( - hiveConf.get(TABLE_TYPE.key(), TableType.MANAGED.toString()), - TableType.class); + TableType tableType = createTableType(); Table table = new Table( identifier.getTableName(), @@ -697,6 +706,11 @@ private Table newHmsTable(Identifier identifier, Map tableParame return table; } + private TableType createTableType() { + return OptionsUtils.convertToEnum( + hiveConf.get(TABLE_TYPE.key(), TableType.MANAGED.toString()), TableType.class); + } + private void updateHmsTable(Table table, Identifier identifier, TableSchema schema) { StorageDescriptor sd = table.getSd() != null ? table.getSd() : new StorageDescriptor(); @@ -754,7 +768,11 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche } // update location - locationHelper.specifyTableLocation(table, getDataTableLocation(identifier).toString()); + String location = + schema.options().containsKey(CoreOptions.PATH.key()) + ? schema.options().get(CoreOptions.PATH.key()) + : getDataTableLocation(identifier).toString(); + locationHelper.specifyTableLocation(table, location); } private void updateHmsTablePars(Table table, TableSchema schema) { @@ -778,6 +796,10 @@ private FieldSchema convertToFieldSchema(DataField dataField) { } private SchemaManager schemaManager(Identifier identifier) { + return schemaManager(identifier, getDataTableLocation(identifier)); + } + + private SchemaManager schemaManager(Identifier identifier, Path path) { return new SchemaManager( fileIO, getDataTableLocation(identifier), diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index 6b13a80e801ad..9459d3ca2ae3b 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -18,9 +18,11 @@ package org.apache.paimon.hive; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -29,6 +31,7 @@ import org.apache.paimon.utils.CommonTestUtils; import org.apache.paimon.utils.HadoopUtils; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.hadoop.hive.conf.HiveConf; @@ -37,6 +40,7 @@ import org.apache.thrift.TException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.lang.reflect.Field; import java.util.Arrays; @@ -268,4 +272,36 @@ public void testAlterHiveTableParameters() { fail("Test failed due to exception: " + e.getMessage()); } } + + @Test + public void testCreateExternalTableWithLocation(@TempDir java.nio.file.Path tempDir) + throws Exception { + HiveConf hiveConf = new HiveConf(); + String jdoConnectionURL = "jdbc:derby:memory:" + UUID.randomUUID(); + hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true"); + hiveConf.set(CatalogOptions.TABLE_TYPE.key(), "external"); + String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; + HiveCatalog externalWarehouseCatalog = + new HiveCatalog(fileIO, hiveConf, metastoreClientClass, warehouse); + + String externalTablePath = tempDir.toString(); + + Schema schema = + new Schema( + Lists.newArrayList(new DataField(0, "foo", DataTypes.INT())), + Collections.emptyList(), + Collections.emptyList(), + ImmutableMap.of("path", externalTablePath), + ""); + + Identifier identifier = Identifier.create("default", "my_table"); + externalWarehouseCatalog.createTable(identifier, schema, true); + + org.apache.paimon.table.Table table = externalWarehouseCatalog.getTable(identifier); + assertThat(table.options()) + .extracting(CoreOptions.PATH.key()) + .isEqualTo("file:" + externalTablePath); + + externalWarehouseCatalog.close(); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index a2ea6d0faa34c..6e507264110ac 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -392,6 +392,11 @@ private Schema toInitialSchema( Map normalizedProperties = mergeSQLConf(properties); normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER); normalizedProperties.remove(TableCatalog.PROP_COMMENT); + if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) { + String path = normalizedProperties.remove(TableCatalog.PROP_LOCATION); + normalizedProperties.put(CoreOptions.PATH.key(), path); + } + String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER); List primaryKeys = pkAsString == null