From fadf650303f77a795e994afffea04110cc3e2404 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 20 Nov 2024 02:21:55 +0800 Subject: [PATCH] update --- .../paimon/catalog/AbstractCatalog.java | 14 ++++ .../org/apache/paimon/hive/HiveCatalog.java | 77 +++++++++++++------ .../apache/paimon/hive/HiveCatalogTest.java | 2 +- .../org/apache/paimon/spark/SparkCatalog.java | 1 - .../org/apache/paimon/spark/SparkTable.scala | 3 + .../apache/paimon/spark/SparkReadITCase.java | 27 +++---- .../paimon/spark/PaimonHiveTestBase.scala | 1 - .../apache/paimon/spark/sql/DDLTestBase.scala | 24 ++++++ .../sql/DDLWithHiveCatalogTestBase.scala | 50 ++++++++++++ 9 files changed, 155 insertions(+), 44 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 93018f12c9f1..1b6f1b8d812b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -133,6 +133,10 @@ public boolean allowUpperCase() { return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true); } + protected boolean externalTableEnabled() { + return false; + } + @Override public void createDatabase(String name, boolean ignoreIfExists, Map properties) throws DatabaseAlreadyExistException { @@ -272,6 +276,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx validateIdentifierNameCaseInsensitive(identifier); validateFieldNameCaseInsensitive(schema.rowType().getFieldNames()); validateAutoCreateClose(schema.options()); + validateExternalTableSupport(schema.options()); // check db exists getDatabase(identifier.getDatabaseName()); @@ -590,6 +595,15 @@ private void validateAutoCreateClose(Map options) { CoreOptions.AUTO_CREATE.key(), Boolean.FALSE)); } + private void validateExternalTableSupport(Map options) { + if (!externalTableEnabled() && options.containsKey(CoreOptions.PATH.key())) { + throw new UnsupportedOperationException( + String.format( + "The current catalog %s does not support external tables, so specifying the path is not allowed when creating a table.", + this.getClass().getSimpleName())); + } + } + // =============================== Meta in File System ===================================== protected List listDatabasesInFileSystem(Path warehouse) throws IOException { diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index eed0fdb9bff1..505c9f208045 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -83,7 +83,6 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -210,6 +209,12 @@ public Path getTableLocation(Identifier identifier) { return getTableLocation(identifier, table); } + private Path initialTableLocation(Map tableOptions, Identifier identifier) { + return tableOptions.containsKey(CoreOptions.PATH.key()) + ? new Path(tableOptions.get(CoreOptions.PATH.key())) + : getTableLocation(identifier, null); + } + private Path getTableLocation(Identifier identifier, @Nullable Table table) { try { String databaseName = identifier.getDatabaseName(); @@ -634,8 +639,11 @@ public void createFormatTable(Identifier identifier, Schema schema) { options, schema.comment()); try { - Path location = getTableLocation(identifier, null); - Table hiveTable = createHiveFormatTable(identifier, newSchema, location); + Map tableOptions = schema.options(); + boolean externalTable = + options.containsKey(CoreOptions.PATH.key()) || usingExternalTable(); + Path location = initialTableLocation(tableOptions, identifier); + Table hiveTable = createHiveFormatTable(identifier, newSchema, location, externalTable); clients.execute(client -> client.createTable(hiveTable)); } catch (Exception e) { // we don't need to delete directories since HMS will roll back db and fs if failed. @@ -654,18 +662,19 @@ private boolean usingExternalTable() { @Override protected void dropTableImpl(Identifier identifier) { try { + boolean externalTable = isExternalTable(getHmsTable(identifier)); clients.execute( client -> client.dropTable( identifier.getDatabaseName(), identifier.getTableName(), - true, + !externalTable, false, true)); // When drop a Hive external table, only the hive metadata is deleted and the data files // are not deleted. - if (usingExternalTable()) { + if (externalTable) { return; } @@ -680,7 +689,7 @@ protected void dropTableImpl(Identifier identifier) { } catch (Exception ee) { LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); } - } catch (TException e) { + } catch (TException | TableNotExistException e) { throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -691,13 +700,12 @@ protected void dropTableImpl(Identifier identifier) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { - // first commit changes to underlying files - // if changes on Hive fails there is no harm to perform the same changes to files again - Path location = getTableLocation(identifier, null); + Map tableOptions = schema.options(); + boolean externalTable = options.containsKey(CoreOptions.PATH.key()) || usingExternalTable(); + Path location = initialTableLocation(tableOptions, identifier); TableSchema tableSchema; try { - tableSchema = - schemaManager(identifier, location).createTable(schema, usingExternalTable()); + tableSchema = schemaManager(identifier, location).createTable(schema, externalTable); } catch (Exception e) { throw new RuntimeException( "Failed to commit changes of table " @@ -709,7 +717,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) { try { clients.execute( client -> - client.createTable(createHiveTable(identifier, tableSchema, location))); + client.createTable( + createHiveTable( + identifier, tableSchema, location, externalTable))); } catch (Exception e) { try { fileIO.deleteDirectoryQuietly(location); @@ -720,7 +730,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) { } } - private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Path location) { + private Table createHiveTable( + Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) { checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE); Map tblProperties; @@ -740,13 +751,14 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Pa } } - Table table = newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE); + Table table = + newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE, externalTable); updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location); return table; } private Table createHiveFormatTable( - Identifier identifier, TableSchema tableSchema, Path location) { + Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) { Options options = Options.fromMap(tableSchema.options()); checkArgument(options.get(TYPE) == FORMAT_TABLE); @@ -757,7 +769,7 @@ private Table createHiveFormatTable( Map tblProperties = new HashMap<>(); - Table table = newHmsTable(identifier, tblProperties, provider); + Table table = newHmsTable(identifier, tblProperties, provider, externalTable); updateHmsTable(table, identifier, tableSchema, provider, location); if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) { @@ -865,6 +877,11 @@ public boolean allowUpperCase() { return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false); } + @Override + protected boolean externalTableEnabled() { + return true; + } + public boolean syncAllProperties() { return catalogOptions.get(SYNC_ALL_PROPERTIES); } @@ -921,10 +938,13 @@ public void repairTable(Identifier identifier) throws TableNotExistException { TableSchema tableSchema = tableSchemaInFileSystem(location, identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); - Table newTable = createHiveTable(identifier, tableSchema, location); + try { + Table newTable = null; try { Table table = getHmsTable(identifier); + newTable = + createHiveTable(identifier, tableSchema, location, isExternalTable(table)); checkArgument( isPaimonTable(table), "Table %s is not a paimon table in hive metastore.", @@ -935,7 +955,8 @@ public void repairTable(Identifier identifier) throws TableNotExistException { } } catch (TableNotExistException e) { // hive table does not exist. - clients.execute(client -> client.createTable(newTable)); + Table finalNewTable = newTable; + clients.execute(client -> client.createTable(finalNewTable)); } // repair partitions @@ -1012,13 +1033,17 @@ public static boolean isView(Table table) { return table != null && TableType.VIRTUAL_VIEW.name().equals(table.getTableType()); } + private boolean isExternalTable(Table table) { + return (table != null && TableType.EXTERNAL_TABLE.name().equals(table.getTableType())) + || usingExternalTable(); + } + private Table newHmsTable( - Identifier identifier, Map tableParameters, String provider) { + Identifier identifier, + Map tableParameters, + String provider, + boolean externalTable) { long currentTimeMillis = System.currentTimeMillis(); - CatalogTableType tableType = - OptionsUtils.convertToEnum( - hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()), - CatalogTableType.class); if (provider == null) { provider = PAIMON_TABLE_TYPE_VALUE; } @@ -1036,7 +1061,9 @@ private Table newHmsTable( tableParameters, null, null, - tableType.toString().toUpperCase(Locale.ROOT) + "_TABLE"); + externalTable + ? TableType.EXTERNAL_TABLE.name() + : TableType.MANAGED_TABLE.name()); table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase()); if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) { table.getParameters() @@ -1045,7 +1072,7 @@ private Table newHmsTable( table.getParameters().put(FILE_FORMAT.key(), provider.toLowerCase()); table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString()); } - if (CatalogTableType.EXTERNAL.equals(tableType)) { + if (externalTable) { table.getParameters().put("EXTERNAL", "TRUE"); } return table; diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index 178cc3c19bb4..3ba3f89e412f 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -18,8 +18,8 @@ package org.apache.paimon.hive; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index fd19f8801881..5ad1b13b7c7b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -415,7 +415,6 @@ private Schema toInitialSchema( String path = normalizedProperties.remove(TableCatalog.PROP_LOCATION); normalizedProperties.put(CoreOptions.PATH.key(), path); } - String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER); List primaryKeys = pkAsString == null diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala index 39b1947e4f37..b9a90d8b5bef 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala @@ -64,6 +64,9 @@ case class SparkTable(table: Table) if (table.comment.isPresent) { properties.put(TableCatalog.PROP_COMMENT, table.comment.get) } + if (properties.containsKey(CoreOptions.PATH.key())) { + properties.put(TableCatalog.PROP_LOCATION, properties.get(CoreOptions.PATH.key())) + } properties case _ => Collections.emptyMap() } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index d01ea5f1454b..b00267410a7f 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -190,8 +190,7 @@ public void testCreateTableAs() { spark.sql("INSERT INTO partitionedTable VALUES(1,'aaa','bbb')"); spark.sql( "CREATE TABLE partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM partitionedTable"); - - String tablePath = new Path(warehousePath, "default.db/partitionedTableAs").toString(); + Path tablePath = new Path(warehousePath, "default.db/partitionedTableAs"); assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString()) .isEqualTo( String.format( @@ -221,8 +220,7 @@ public void testCreateTableAs() { spark.sql("INSERT INTO testTable VALUES(1,'a','b')"); spark.sql( "CREATE TABLE testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM testTable"); - - String testTableAsPath = new Path(warehousePath, "default.db/testTableAs").toString(); + tablePath = new Path(warehousePath, "default.db/testTableAs"); assertThat(spark.sql("SHOW CREATE TABLE testTableAs").collectAsList().toString()) .isEqualTo( String.format( @@ -234,8 +232,8 @@ public void testCreateTableAs() { + "]]", showCreateString( "testTableAs", "a BIGINT", "b VARCHAR(10)", "c CHAR(10)"), - testTableAsPath, - testTableAsPath)); + tablePath, + tablePath)); List resultProp = spark.sql("SELECT * FROM testTableAs").collectAsList(); assertThat(resultProp.stream().map(Row::toString)) @@ -253,8 +251,7 @@ public void testCreateTableAs() { + "COMMENT 'table comment'"); spark.sql("INSERT INTO t_pk VALUES(1,'aaa','bbb')"); spark.sql("CREATE TABLE t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM t_pk"); - - String tPkAsPath = new Path(warehousePath, "default.db/t_pk_as").toString(); + tablePath = new Path(warehousePath, "default.db/t_pk_as"); assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString()) .isEqualTo( String.format( @@ -263,8 +260,8 @@ public void testCreateTableAs() { + "TBLPROPERTIES (\n 'path' = '%s',\n 'primary-key' = 'a')\n]]", showCreateString( "t_pk_as", "a BIGINT NOT NULL", "b STRING", "c STRING"), - tPkAsPath, - tPkAsPath)); + tablePath, + tablePath)); List resultPk = spark.sql("SELECT * FROM t_pk_as").collectAsList(); assertThat(resultPk.stream().map(Row::toString)).containsExactlyInAnyOrder("[1,aaa,bbb]"); @@ -283,8 +280,7 @@ public void testCreateTableAs() { spark.sql("INSERT INTO t_all VALUES(1,2,'bbb','2020-01-01','12')"); spark.sql( "CREATE TABLE t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM t_all"); - - String tAllAsPath = new Path(warehousePath, "default.db/t_all_as").toString(); + tablePath = new Path(warehousePath, "default.db/t_all_as"); assertThat(spark.sql("SHOW CREATE TABLE t_all_as").collectAsList().toString()) .isEqualTo( String.format( @@ -302,8 +298,8 @@ public void testCreateTableAs() { "behavior STRING", "dt STRING NOT NULL", "hh STRING NOT NULL"), - tAllAsPath, - tAllAsPath)); + tablePath, + tablePath)); List resultAll = spark.sql("SELECT * FROM t_all_as").collectAsList(); assertThat(resultAll.stream().map(Row::toString)) .containsExactlyInAnyOrder("[1,2,bbb,2020-01-01,12]"); @@ -380,8 +376,7 @@ public void testShowCreateTable() { + " 'k1' = 'v1'\n" + ")"); - String tablePath = new Path(warehousePath, "default.db/tbl").toString(); - + Path tablePath = new Path(warehousePath, "default.db/tbl"); assertThat(spark.sql("SHOW CREATE TABLE tbl").collectAsList().toString()) .isEqualTo( String.format( diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index ccd705e26967..842147615d1a 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -18,7 +18,6 @@ package org.apache.paimon.spark -import org.apache.paimon.Snapshot import org.apache.paimon.hive.TestHiveMetastore import org.apache.hadoop.conf.Configuration diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index cf1a71d51fcc..3d268fd34489 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -546,4 +546,28 @@ abstract class DDLTestBase extends PaimonSparkTestBase { } } } + + test("Paimon DDL: create and drop external / managed table") { + withTempDir { + tbLocation => + withTable("external_tbl", "managed_tbl") { + // create external table + val error = intercept[UnsupportedOperationException] { + sql( + s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '${tbLocation.getCanonicalPath}'") + }.getMessage + assert(error.contains("does not support external tables")) + + // create managed table + sql("CREATE TABLE managed_tbl (id INT) USING paimon") + val table = loadTable("managed_tbl") + val fileIO = table.fileIO() + val tableLocation = table.location() + + // drop managed table + sql("DROP TABLE managed_tbl") + assert(!fileIO.exists(tableLocation)) + } + } + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 56922ae2aeff..bfd6716b2128 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -297,6 +297,56 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } } + test("Paimon DDL with hive catalog: create and drop external / managed table") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + withTempDir { + tbLocation => + withDatabase("paimon_db") { + spark.sql(s"CREATE DATABASE paimon_db") + spark.sql(s"USE paimon_db") + withTable("external_tbl", "managed_tbl") { + val expertTbLocation = tbLocation.getCanonicalPath + // create external table + spark.sql( + s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '$expertTbLocation'") + spark.sql("INSERT INTO external_tbl VALUES (1)") + checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1)) + val table = loadTable("paimon_db", "external_tbl") + val fileIO = table.fileIO() + val actualTbLocation = table.location() + assert(actualTbLocation.toString.split(':').apply(1).equals(expertTbLocation)) + + // drop external table + spark.sql("DROP TABLE external_tbl") + assert(fileIO.exists(actualTbLocation)) + + // create external table again using the same location + spark.sql( + s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '$expertTbLocation'") + checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1)) + assert( + loadTable("paimon_db", "external_tbl") + .location() + .toString + .split(':') + .apply(1) + .equals(expertTbLocation)) + + // create managed table + spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon") + val managedTbLocation = loadTable("paimon_db", "managed_tbl").location() + + // drop managed table + spark.sql("DROP TABLE managed_tbl") + assert(!fileIO.exists(managedTbLocation)) + } + } + } + } + } + def getDatabaseProp(dbName: String, propertyName: String): String = { spark .sql(s"DESC DATABASE EXTENDED $dbName")