diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 93018f12c9f1..fff593aabb62 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -133,6 +133,10 @@ public boolean allowUpperCase() { return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true); } + protected boolean allowCustomTablePath() { + return false; + } + @Override public void createDatabase(String name, boolean ignoreIfExists, Map properties) throws DatabaseAlreadyExistException { @@ -272,6 +276,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx validateIdentifierNameCaseInsensitive(identifier); validateFieldNameCaseInsensitive(schema.rowType().getFieldNames()); validateAutoCreateClose(schema.options()); + validateCustomTablePath(schema.options()); // check db exists getDatabase(identifier.getDatabaseName()); @@ -590,6 +595,15 @@ private void validateAutoCreateClose(Map options) { CoreOptions.AUTO_CREATE.key(), Boolean.FALSE)); } + private void validateCustomTablePath(Map options) { + if (!allowCustomTablePath() && options.containsKey(CoreOptions.PATH.key())) { + throw new UnsupportedOperationException( + String.format( + "The current catalog %s does not support specifying the table path when creating a table.", + this.getClass().getSimpleName())); + } + } + // =============================== Meta in File System ===================================== protected List listDatabasesInFileSystem(Path warehouse) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java index 9984e3feef0c..33309a7cecc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java @@ -341,13 +341,4 @@ public Schema build() { return new Schema(columns, partitionKeys, primaryKeys, options, comment); } } - - public static Schema fromTableSchema(TableSchema tableSchema) { - return new Schema( - tableSchema.fields(), - tableSchema.partitionKeys(), - tableSchema.primaryKeys(), - tableSchema.options(), - tableSchema.comment()); - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java index 883d7b06ab5f..67eecbc6f2ae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java @@ -18,15 +18,21 @@ package org.apache.paimon.flink.clone; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables; + import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -37,6 +43,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Pick the files to be cloned of a table based on the input record. The record type it produce is @@ -77,7 +84,7 @@ public void processElement(StreamRecord> streamRecord) th FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier); targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true); targetCatalog.createTable( - targetIdentifier, Schema.fromTableSchema(sourceTable.schema()), true); + targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true); List result = toCloneFileInfos( @@ -95,6 +102,18 @@ public void processElement(StreamRecord> streamRecord) th } } + private static Schema newSchemaFromTableSchema(TableSchema tableSchema) { + return new Schema( + ImmutableList.copyOf(tableSchema.fields()), + ImmutableList.copyOf(tableSchema.partitionKeys()), + ImmutableList.copyOf(tableSchema.primaryKeys()), + ImmutableMap.copyOf( + Iterables.filter( + tableSchema.options().entrySet(), + entry -> !Objects.equals(entry.getKey(), CoreOptions.PATH.key()))), + tableSchema.comment()); + } + private List toCloneFileInfos( List files, Path sourceTableRoot, diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index eed0fdb9bff1..93e7e87ef5c7 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -48,6 +48,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.view.View; import org.apache.paimon.view.ViewImpl; @@ -83,7 +84,6 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -210,6 +210,20 @@ public Path getTableLocation(Identifier identifier) { return getTableLocation(identifier, table); } + private Pair initialTableLocation( + Map tableOptions, Identifier identifier) { + boolean externalTable; + Path location; + if (tableOptions.containsKey(CoreOptions.PATH.key())) { + externalTable = true; + location = new Path(tableOptions.get(CoreOptions.PATH.key())); + } else { + externalTable = usingExternalTable(); + location = getTableLocation(identifier, null); + } + return Pair.of(location, externalTable); + } + private Path getTableLocation(Identifier identifier, @Nullable Table table) { try { String databaseName = identifier.getDatabaseName(); @@ -634,8 +648,10 @@ public void createFormatTable(Identifier identifier, Schema schema) { options, schema.comment()); try { - Path location = getTableLocation(identifier, null); - Table hiveTable = createHiveFormatTable(identifier, newSchema, location); + Pair pair = initialTableLocation(schema.options(), identifier); + Path location = pair.getLeft(); + boolean externalTable = pair.getRight(); + Table hiveTable = createHiveFormatTable(identifier, newSchema, location, externalTable); clients.execute(client -> client.createTable(hiveTable)); } catch (Exception e) { // we don't need to delete directories since HMS will roll back db and fs if failed. @@ -654,18 +670,19 @@ private boolean usingExternalTable() { @Override protected void dropTableImpl(Identifier identifier) { try { + boolean externalTable = isExternalTable(getHmsTable(identifier)); clients.execute( client -> client.dropTable( identifier.getDatabaseName(), identifier.getTableName(), - true, + !externalTable, false, true)); // When drop a Hive external table, only the hive metadata is deleted and the data files // are not deleted. - if (usingExternalTable()) { + if (externalTable) { return; } @@ -680,7 +697,7 @@ protected void dropTableImpl(Identifier identifier) { } catch (Exception ee) { LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); } - } catch (TException e) { + } catch (TException | TableNotExistException e) { throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -691,13 +708,12 @@ protected void dropTableImpl(Identifier identifier) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { - // first commit changes to underlying files - // if changes on Hive fails there is no harm to perform the same changes to files again - Path location = getTableLocation(identifier, null); + Pair pair = initialTableLocation(schema.options(), identifier); + Path location = pair.getLeft(); + boolean externalTable = pair.getRight(); TableSchema tableSchema; try { - tableSchema = - schemaManager(identifier, location).createTable(schema, usingExternalTable()); + tableSchema = schemaManager(identifier, location).createTable(schema, externalTable); } catch (Exception e) { throw new RuntimeException( "Failed to commit changes of table " @@ -709,7 +725,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) { try { clients.execute( client -> - client.createTable(createHiveTable(identifier, tableSchema, location))); + client.createTable( + createHiveTable( + identifier, tableSchema, location, externalTable))); } catch (Exception e) { try { fileIO.deleteDirectoryQuietly(location); @@ -720,7 +738,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) { } } - private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Path location) { + private Table createHiveTable( + Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) { checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE); Map tblProperties; @@ -740,13 +759,14 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Pa } } - Table table = newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE); + Table table = + newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE, externalTable); updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location); return table; } private Table createHiveFormatTable( - Identifier identifier, TableSchema tableSchema, Path location) { + Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) { Options options = Options.fromMap(tableSchema.options()); checkArgument(options.get(TYPE) == FORMAT_TABLE); @@ -757,7 +777,7 @@ private Table createHiveFormatTable( Map tblProperties = new HashMap<>(); - Table table = newHmsTable(identifier, tblProperties, provider); + Table table = newHmsTable(identifier, tblProperties, provider, externalTable); updateHmsTable(table, identifier, tableSchema, provider, location); if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) { @@ -865,6 +885,11 @@ public boolean allowUpperCase() { return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false); } + @Override + protected boolean allowCustomTablePath() { + return true; + } + public boolean syncAllProperties() { return catalogOptions.get(SYNC_ALL_PROPERTIES); } @@ -921,10 +946,13 @@ public void repairTable(Identifier identifier) throws TableNotExistException { TableSchema tableSchema = tableSchemaInFileSystem(location, identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); - Table newTable = createHiveTable(identifier, tableSchema, location); + try { + Table newTable = null; try { Table table = getHmsTable(identifier); + newTable = + createHiveTable(identifier, tableSchema, location, isExternalTable(table)); checkArgument( isPaimonTable(table), "Table %s is not a paimon table in hive metastore.", @@ -935,7 +963,13 @@ public void repairTable(Identifier identifier) throws TableNotExistException { } } catch (TableNotExistException e) { // hive table does not exist. - clients.execute(client -> client.createTable(newTable)); + if (newTable == null) { + newTable = + createHiveTable( + identifier, tableSchema, location, usingExternalTable()); + } + Table finalNewTable = newTable; + clients.execute(client -> client.createTable(finalNewTable)); } // repair partitions @@ -1012,13 +1046,16 @@ public static boolean isView(Table table) { return table != null && TableType.VIRTUAL_VIEW.name().equals(table.getTableType()); } + private boolean isExternalTable(Table table) { + return table != null && TableType.EXTERNAL_TABLE.name().equals(table.getTableType()); + } + private Table newHmsTable( - Identifier identifier, Map tableParameters, String provider) { + Identifier identifier, + Map tableParameters, + String provider, + boolean externalTable) { long currentTimeMillis = System.currentTimeMillis(); - CatalogTableType tableType = - OptionsUtils.convertToEnum( - hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()), - CatalogTableType.class); if (provider == null) { provider = PAIMON_TABLE_TYPE_VALUE; } @@ -1036,7 +1073,9 @@ private Table newHmsTable( tableParameters, null, null, - tableType.toString().toUpperCase(Locale.ROOT) + "_TABLE"); + externalTable + ? TableType.EXTERNAL_TABLE.name() + : TableType.MANAGED_TABLE.name()); table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase()); if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) { table.getParameters() @@ -1045,7 +1084,7 @@ private Table newHmsTable( table.getParameters().put(FILE_FORMAT.key(), provider.toLowerCase()); table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString()); } - if (CatalogTableType.EXTERNAL.equals(tableType)) { + if (externalTable) { table.getParameters().put("EXTERNAL", "TRUE"); } return table; diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index dcd770c57f51..3ba3f89e412f 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -18,10 +18,12 @@ package org.apache.paimon.hive; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -30,6 +32,7 @@ import org.apache.paimon.utils.CommonTestUtils; import org.apache.paimon.utils.HadoopUtils; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.hadoop.hive.conf.HiveConf; @@ -38,6 +41,7 @@ import org.apache.thrift.TException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.lang.reflect.Field; import java.util.ArrayList; @@ -359,4 +363,36 @@ protected boolean supportsView() { protected boolean supportsFormatTable() { return true; } + + @Test + public void testCreateExternalTableWithLocation(@TempDir java.nio.file.Path tempDir) + throws Exception { + HiveConf hiveConf = new HiveConf(); + String jdoConnectionURL = "jdbc:derby:memory:" + UUID.randomUUID(); + hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true"); + hiveConf.set(CatalogOptions.TABLE_TYPE.key(), "external"); + String metastoreClientClass = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; + HiveCatalog externalWarehouseCatalog = + new HiveCatalog(fileIO, hiveConf, metastoreClientClass, warehouse); + + String externalTablePath = tempDir.toString(); + + Schema schema = + new Schema( + Lists.newArrayList(new DataField(0, "foo", DataTypes.INT())), + Collections.emptyList(), + Collections.emptyList(), + ImmutableMap.of("path", externalTablePath), + ""); + + Identifier identifier = Identifier.create("default", "my_table"); + externalWarehouseCatalog.createTable(identifier, schema, true); + + org.apache.paimon.table.Table table = externalWarehouseCatalog.getTable(identifier); + assertThat(table.options()) + .extracting(CoreOptions.PATH.key()) + .isEqualTo("file:" + externalTablePath); + + externalWarehouseCatalog.close(); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 3b9af1694eef..5ad1b13b7c7b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -411,6 +411,10 @@ private Schema toInitialSchema( } normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER); normalizedProperties.remove(TableCatalog.PROP_COMMENT); + if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) { + String path = normalizedProperties.remove(TableCatalog.PROP_LOCATION); + normalizedProperties.put(CoreOptions.PATH.key(), path); + } String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER); List primaryKeys = pkAsString == null diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala index 39b1947e4f37..b9a90d8b5bef 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala @@ -64,6 +64,9 @@ case class SparkTable(table: Table) if (table.comment.isPresent) { properties.put(TableCatalog.PROP_COMMENT, table.comment.get) } + if (properties.containsKey(CoreOptions.PATH.key())) { + properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key())) + } properties case _ => Collections.emptyMap() } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index b4565447c6fc..b00267410a7f 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -190,17 +190,20 @@ public void testCreateTableAs() { spark.sql("INSERT INTO partitionedTable VALUES(1,'aaa','bbb')"); spark.sql( "CREATE TABLE partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM partitionedTable"); + Path tablePath = new Path(warehousePath, "default.db/partitionedTableAs"); assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString()) .isEqualTo( String.format( "[[%s" + "PARTITIONED BY (a)\n" + + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" + " 'path' = '%s')\n" + "]]", showCreateString( "partitionedTableAs", "a BIGINT", "b STRING", "c STRING"), - new Path(warehousePath, "default.db/partitionedTableAs"))); + tablePath, + tablePath)); List resultPartition = spark.sql("SELECT * FROM partitionedTableAs").collectAsList(); assertThat(resultPartition.stream().map(Row::toString)) .containsExactlyInAnyOrder("[1,aaa,bbb]"); @@ -217,17 +220,20 @@ public void testCreateTableAs() { spark.sql("INSERT INTO testTable VALUES(1,'a','b')"); spark.sql( "CREATE TABLE testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM testTable"); + tablePath = new Path(warehousePath, "default.db/testTableAs"); assertThat(spark.sql("SHOW CREATE TABLE testTableAs").collectAsList().toString()) .isEqualTo( String.format( "[[%s" + + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" + " 'file.format' = 'parquet',\n" + " 'path' = '%s')\n" + "]]", showCreateString( "testTableAs", "a BIGINT", "b VARCHAR(10)", "c CHAR(10)"), - new Path(warehousePath, "default.db/testTableAs"))); + tablePath, + tablePath)); List resultProp = spark.sql("SELECT * FROM testTableAs").collectAsList(); assertThat(resultProp.stream().map(Row::toString)) @@ -245,13 +251,17 @@ public void testCreateTableAs() { + "COMMENT 'table comment'"); spark.sql("INSERT INTO t_pk VALUES(1,'aaa','bbb')"); spark.sql("CREATE TABLE t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM t_pk"); + tablePath = new Path(warehousePath, "default.db/t_pk_as"); assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString()) .isEqualTo( String.format( - "[[%sTBLPROPERTIES (\n 'path' = '%s',\n 'primary-key' = 'a')\n]]", + "[[%s" + + "LOCATION '%s'\n" + + "TBLPROPERTIES (\n 'path' = '%s',\n 'primary-key' = 'a')\n]]", showCreateString( "t_pk_as", "a BIGINT NOT NULL", "b STRING", "c STRING"), - new Path(warehousePath, "default.db/t_pk_as"))); + tablePath, + tablePath)); List resultPk = spark.sql("SELECT * FROM t_pk_as").collectAsList(); assertThat(resultPk.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,aaa,bbb]"); @@ -270,11 +280,13 @@ public void testCreateTableAs() { spark.sql("INSERT INTO t_all VALUES(1,2,'bbb','2020-01-01','12')"); spark.sql( "CREATE TABLE t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM t_all"); + tablePath = new Path(warehousePath, "default.db/t_all_as"); assertThat(spark.sql("SHOW CREATE TABLE t_all_as").collectAsList().toString()) .isEqualTo( String.format( "[[%s" + "PARTITIONED BY (dt)\n" + + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" + " 'path' = '%s',\n" + " 'primary-key' = 'dt,hh')\n" @@ -286,7 +298,8 @@ public void testCreateTableAs() { "behavior STRING", "dt STRING NOT NULL", "hh STRING NOT NULL"), - new Path(warehousePath, "default.db/t_all_as"))); + tablePath, + tablePath)); List resultAll = spark.sql("SELECT * FROM t_all_as").collectAsList(); assertThat(resultAll.stream().map(Row::toString)) .containsExactlyInAnyOrder("[1,2,bbb,2020-01-01,12]"); @@ -363,12 +376,14 @@ public void testShowCreateTable() { + " 'k1' = 'v1'\n" + ")"); + Path tablePath = new Path(warehousePath, "default.db/tbl"); assertThat(spark.sql("SHOW CREATE TABLE tbl").collectAsList().toString()) .isEqualTo( String.format( "[[%s" + "PARTITIONED BY (b)\n" + "COMMENT 'tbl comment'\n" + + "LOCATION '%s'\n" + "TBLPROPERTIES (\n" + " 'k1' = 'v1',\n" + " 'path' = '%s',\n" @@ -377,7 +392,8 @@ public void testShowCreateTable() { "tbl", "a INT NOT NULL COMMENT 'a comment'", "b STRING NOT NULL"), - new Path(warehousePath, "default.db/tbl"))); + tablePath, + tablePath)); } @Test diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index ccd705e26967..842147615d1a 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -18,7 +18,6 @@ package org.apache.paimon.spark -import org.apache.paimon.Snapshot import org.apache.paimon.hive.TestHiveMetastore import org.apache.hadoop.conf.Configuration diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index cf1a71d51fcc..b09a2be98dc8 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -546,4 +546,28 @@ abstract class DDLTestBase extends PaimonSparkTestBase { } } } + + test("Paimon DDL: create and drop external / managed table") { + withTempDir { + tbLocation => + withTable("external_tbl", "managed_tbl") { + // create external table + val error = intercept[UnsupportedOperationException] { + sql( + s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '${tbLocation.getCanonicalPath}'") + }.getMessage + assert(error.contains("not support")) + + // create managed table + sql("CREATE TABLE managed_tbl (id INT) USING paimon") + val table = loadTable("managed_tbl") + val fileIO = table.fileIO() + val tableLocation = table.location() + + // drop managed table + sql("DROP TABLE managed_tbl") + assert(!fileIO.exists(tableLocation)) + } + } + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 56922ae2aeff..bfd6716b2128 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -297,6 +297,56 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } } + test("Paimon DDL with hive catalog: create and drop external / managed table") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + withTempDir { + tbLocation => + withDatabase("paimon_db") { + spark.sql(s"CREATE DATABASE paimon_db") + spark.sql(s"USE paimon_db") + withTable("external_tbl", "managed_tbl") { + val expertTbLocation = tbLocation.getCanonicalPath + // create external table + spark.sql( + s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '$expertTbLocation'") + spark.sql("INSERT INTO external_tbl VALUES (1)") + checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1)) + val table = loadTable("paimon_db", "external_tbl") + val fileIO = table.fileIO() + val actualTbLocation = table.location() + assert(actualTbLocation.toString.split(':').apply(1).equals(expertTbLocation)) + + // drop external table + spark.sql("DROP TABLE external_tbl") + assert(fileIO.exists(actualTbLocation)) + + // create external table again using the same location + spark.sql( + s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '$expertTbLocation'") + checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1)) + assert( + loadTable("paimon_db", "external_tbl") + .location() + .toString + .split(':') + .apply(1) + .equals(expertTbLocation)) + + // create managed table + spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon") + val managedTbLocation = loadTable("paimon_db", "managed_tbl").location() + + // drop managed table + spark.sql("DROP TABLE managed_tbl") + assert(!fileIO.exists(managedTbLocation)) + } + } + } + } + } + def getDatabaseProp(dbName: String, propertyName: String): String = { spark .sql(s"DESC DATABASE EXTENDED $dbName")