From c6387a669d6cd0611c352dbfe4d241fea6a6484c Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Tue, 10 Dec 2024 14:41:37 +0800 Subject: [PATCH] [hive][spark] Support creating external table without schema when the table already exists (#4638) --- docs/content/spark/sql-ddl.md | 29 ++++++- docs/content/spark/sql-write.md | 31 +++++--- .../apache/paimon/schema/SchemaManager.java | 54 +++++++++---- .../org/apache/paimon/hive/HiveCatalog.java | 10 +-- .../org/apache/paimon/spark/SparkCatalog.java | 30 ++------ .../org/apache/paimon/spark/SparkSource.scala | 2 +- .../apache/paimon/spark/sql/DDLTestBase.scala | 2 +- .../sql/DDLWithHiveCatalogTestBase.scala | 75 ++++++++++++++----- 8 files changed, 157 insertions(+), 76 deletions(-) diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index 638a21a7042a..cfe105f6ac00 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -156,6 +156,33 @@ CREATE TABLE my_table ( ); ``` +### Create External Table + +When the catalog's `metastore` type is `hive`, if the `location` is specified when creating a table, that table will be considered an external table; otherwise, it will be a managed table. + +When you drop an external table, only the metadata in Hive will be removed, and the actual data files will not be deleted; whereas dropping a managed table will also delete the data. + +```sql +CREATE TABLE my_table ( + user_id BIGINT, + item_id BIGINT, + behavior STRING, + dt STRING, + hh STRING +) PARTITIONED BY (dt, hh) TBLPROPERTIES ( + 'primary-key' = 'dt,hh,user_id' +) LOCATION '/path/to/table'; +``` + +Furthermore, if there is already data stored in the specified location, you can create the table without explicitly specifying the fields, partitions and props or other information. +In this case, the new table will inherit them all from the existing table’s metadata. + +However, if you manually specify them, you need to ensure that they are consistent with those of the existing table (props can be a subset). Therefore, it is strongly recommended not to specify them. + +```sql +CREATE TABLE my_table LOCATION '/path/to/table'; +``` + ### Create Table As Select Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`, @@ -241,7 +268,7 @@ DROP VIEW v1; ``` ## Tag -### Create or Replace Tag +### Create Or Replace Tag Create or replace a tag syntax with the following options. - Create a tag with or without the snapshot id and time retention. - Create an existed tag is not failed if using `IF NOT EXISTS` syntax. diff --git a/docs/content/spark/sql-write.md b/docs/content/spark/sql-write.md index 5f4fa2dabc9f..c3afcd3754c8 100644 --- a/docs/content/spark/sql-write.md +++ b/docs/content/spark/sql-write.md @@ -120,7 +120,17 @@ TRUNCATE TABLE my_table; ## Update Table -spark supports update PrimitiveType and StructType, for example: +Updates the column values for the rows that match a predicate. When no predicate is provided, update the column values for all rows. + +Note: + +{{< hint info >}} + +Update primary key columns is not supported when the target table is a primary key table. + +{{< /hint >}} + +Spark supports update PrimitiveType and StructType, for example: ```sql -- Syntax @@ -142,17 +152,22 @@ UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1; ## Delete From Table +Deletes the rows that match a predicate. When no predicate is provided, deletes all rows. + ```sql DELETE FROM my_table WHERE currency = 'UNKNOWN'; ``` ## Merge Into Table -Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit. +Merges a set of updates, insertions and deletions based on a source table into a target table. + +Note: + +{{< hint info >}} + +In update clause, to update primary key columns is not supported when the target table is a primary key table. -{{< hint into >}} -1. In update clause, to update primary key columns is not supported. -2. `WHEN NOT MATCHED BY SOURCE` syntax is not supported. {{< /hint >}} **Example: One** @@ -160,7 +175,6 @@ Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of up This is a simple demo that, if a row exists in the target table update it, else insert it. ```sql - -- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key. MERGE INTO target @@ -170,7 +184,6 @@ WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * - ``` **Example: Two** @@ -178,7 +191,6 @@ THEN INSERT * This is a demo with multiple, conditional clauses. ```sql - -- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key. MERGE INTO target @@ -194,15 +206,12 @@ WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b * 1.1, c) -- when not matched but meet the condition 3, then transform and insert this row; WHEN NOT MATCHED THEN INSERT * -- when not matched, insert this row without any transformation; - ``` ## Streaming Write {{< hint info >}} -Paimon currently supports Spark 3+ for streaming write. - Paimon Structured Streaming only supports the two `append` and `complete` modes. {{< /hint >}} diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 83ddbccfef98..2139dca4a990 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -208,24 +208,18 @@ public TableSchema createTable(Schema schema) throws Exception { return createTable(schema, false); } - public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws Exception { + public TableSchema createTable(Schema schema, boolean externalTable) throws Exception { while (true) { Optional latest = latest(); if (latest.isPresent()) { - TableSchema oldSchema = latest.get(); - boolean isSame = - Objects.equals(oldSchema.fields(), schema.fields()) - && Objects.equals(oldSchema.partitionKeys(), schema.partitionKeys()) - && Objects.equals(oldSchema.primaryKeys(), schema.primaryKeys()) - && Objects.equals(oldSchema.options(), schema.options()); - if (ignoreIfExistsSame && isSame) { - return oldSchema; + TableSchema latestSchema = latest.get(); + if (externalTable) { + checkSchemaForExternalTable(latestSchema, schema); + return latestSchema; + } else { + throw new IllegalStateException( + "Schema in filesystem exists, creation is not allowed."); } - - throw new IllegalStateException( - "Schema in filesystem exists, please use updating," - + " latest schema is: " - + oldSchema); } List fields = schema.fields(); @@ -254,6 +248,38 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws } } + private void checkSchemaForExternalTable(TableSchema existsSchema, Schema newSchema) { + // When creating an external table, if the table already exists in the location, we can + // choose not to specify the fields. + if (newSchema.fields().isEmpty() + // When the fields are explicitly specified, we need check for consistency. + || (Objects.equals(existsSchema.fields(), newSchema.fields()) + && Objects.equals(existsSchema.partitionKeys(), newSchema.partitionKeys()) + && Objects.equals(existsSchema.primaryKeys(), newSchema.primaryKeys()))) { + // check for options + Map existsOptions = existsSchema.options(); + Map newOptions = newSchema.options(); + newOptions.forEach( + (key, value) -> { + if (!key.equals(Catalog.OWNER_PROP) + && (!existsOptions.containsKey(key) + || !existsOptions.get(key).equals(value))) { + throw new RuntimeException( + "New schema's options are not equal to the exists schema's, new schema: " + + newOptions + + ", exists schema: " + + existsOptions); + } + }); + } else { + throw new RuntimeException( + "New schema is not equal to exists schema, new schema: " + + newSchema + + ", exists schema: " + + existsSchema); + } + } + /** Update {@link SchemaChange}s. */ public TableSchema commitChanges(SchemaChange... changes) throws Exception { return commitChanges(Arrays.asList(changes)); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 151e2b4d2c07..c74ede981546 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -720,11 +720,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) { try { tableSchema = schemaManager(identifier, location).createTable(schema, externalTable); } catch (Exception e) { - throw new RuntimeException( - "Failed to commit changes of table " - + identifier.getFullName() - + " to underlying files.", - e); + throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); } try { @@ -735,7 +731,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) { identifier, tableSchema, location, externalTable))); } catch (Exception e) { try { - fileIO.deleteDirectoryQuietly(location); + if (!externalTable) { + fileIO.deleteDirectoryQuietly(location); + } } catch (Exception ee) { LOG.error("Delete directory[{}] fail for table {}", location, identifier, ee); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 5ad1b13b7c7b..d6318c723fe0 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -75,7 +75,6 @@ import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf; import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace; import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** Spark {@link TableCatalog} for paimon. */ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, SupportView { @@ -298,26 +297,8 @@ public org.apache.spark.sql.connector.catalog.Table createTable( Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { try { - String provider = properties.get(TableCatalog.PROP_PROVIDER); - if ((!usePaimon(provider)) - && SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) { - Map newProperties = new HashMap<>(properties); - newProperties.put(TYPE.key(), FORMAT_TABLE.toString()); - newProperties.put(FILE_FORMAT.key(), provider.toLowerCase()); - catalog.createTable( - toIdentifier(ident), - toInitialSchema(schema, partitions, newProperties), - false); - } else { - checkArgument( - usePaimon(provider), - "SparkCatalog can only create paimon table, but current provider is %s", - provider); - catalog.createTable( - toIdentifier(ident), - toInitialSchema(schema, partitions, properties), - false); - } + catalog.createTable( + toIdentifier(ident), toInitialSchema(schema, partitions, properties), false); return loadTable(ident); } catch (Catalog.TableAlreadyExistException e) { throw new TableAlreadyExistsException(ident); @@ -406,9 +387,12 @@ private static SchemaChange.Move getMove( private Schema toInitialSchema( StructType schema, Transform[] partitions, Map properties) { Map normalizedProperties = new HashMap<>(properties); - if (!normalizedProperties.containsKey(TableCatalog.PROP_PROVIDER)) { - normalizedProperties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME()); + String provider = properties.get(TableCatalog.PROP_PROVIDER); + if (!usePaimon(provider) && SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) { + normalizedProperties.put(TYPE.key(), FORMAT_TABLE.toString()); + normalizedProperties.put(FILE_FORMAT.key(), provider.toLowerCase()); } + normalizedProperties.remove(TableCatalog.PROP_PROVIDER); normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER); normalizedProperties.remove(TableCatalog.PROP_COMMENT); if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala index 0170a29f68d3..d80d7350a655 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala @@ -118,7 +118,7 @@ object SparkSource { val NAME = "paimon" - val FORMAT_NAMES = Seq("csv", "orc", "parquet") + val FORMAT_NAMES: Seq[String] = Seq("csv", "orc", "parquet") def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext): BaseRelation = { new BaseRelation { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index 6ad5274496a9..3ed2c98306fb 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -161,7 +161,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase { test("Paimon DDL: create table without using paimon") { withTable("paimon_tbl") { sql("CREATE TABLE paimon_tbl (id int)") - assert(loadTable("paimon_tbl").options().get("provider").equals("paimon")) + assert(!loadTable("paimon_tbl").options().containsKey("provider")) } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index e99e4434ef7f..1189f1f2906b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -326,13 +326,7 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { spark.sql( s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '$expertTbLocation'") checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1)) - assert( - loadTable("paimon_db", "external_tbl") - .location() - .toString - .split(':') - .apply(1) - .equals(expertTbLocation)) + assert(getActualTableLocation("paimon_db", "external_tbl").equals(expertTbLocation)) // create managed table spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon") @@ -373,12 +367,8 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { spark.sql("ALTER TABLE external_tbl RENAME TO external_tbl_renamed") checkAnswer(spark.sql("SELECT * FROM external_tbl_renamed"), Row(1)) assert( - loadTable("paimon_db", "external_tbl_renamed") - .location() - .toString - .split(':') - .apply(1) - .equals(expertTbLocation)) + getActualTableLocation("paimon_db", "external_tbl_renamed").equals( + expertTbLocation)) // create managed table spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon") @@ -389,12 +379,55 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { spark.sql("ALTER TABLE managed_tbl RENAME TO managed_tbl_renamed") checkAnswer(spark.sql("SELECT * FROM managed_tbl_renamed"), Row(1)) assert( - !loadTable("paimon_db", "managed_tbl_renamed") - .location() - .toString - .split(':') - .apply(1) - .equals(managedTbLocation.toString)) + !getActualTableLocation("paimon_db", "managed_tbl_renamed").equals( + managedTbLocation.toString)) + } + } + } + } + } + + test("Paimon DDL with hive catalog: create external table without schema") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + withTempDir { + tbLocation => + withDatabase("paimon_db") { + spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db") + spark.sql(s"USE paimon_db") + withTable("t1", "t2", "t3", "t4", "t5") { + val expertTbLocation = tbLocation.getCanonicalPath + spark.sql(s""" + |CREATE TABLE t1 (id INT, pt INT) USING paimon + |PARTITIONED BY (pt) + |TBLPROPERTIES('primary-key' = 'id', 'k1' = 'v1') + |LOCATION '$expertTbLocation' + |""".stripMargin) + spark.sql("INSERT INTO t1 VALUES (1, 1)") + + // create table without schema + spark.sql(s"CREATE TABLE t2 USING paimon LOCATION '$expertTbLocation'") + checkAnswer(spark.sql("SELECT * FROM t2"), Row(1, 1)) + assert(getActualTableLocation("paimon_db", "t2").equals(expertTbLocation)) + + // create table with wrong schema + intercept[Exception] { + spark.sql( + s"CREATE TABLE t3 (fake_col INT) USING paimon LOCATION '$expertTbLocation'") + } + + // create table with exists props + spark.sql( + s"CREATE TABLE t4 USING paimon TBLPROPERTIES ('k1' = 'v1') LOCATION '$expertTbLocation'") + checkAnswer(spark.sql("SELECT * FROM t4"), Row(1, 1)) + assert(getActualTableLocation("paimon_db", "t4").equals(expertTbLocation)) + + // create table with new props + intercept[Exception] { + spark.sql( + s"CREATE TABLE t5 USING paimon TBLPROPERTIES ('k2' = 'v2') LOCATION '$expertTbLocation'") + } } } } @@ -445,4 +478,8 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { .toMap tableProps("path").split(":")(1) } + + def getActualTableLocation(dbName: String, tblName: String): String = { + loadTable(dbName, tblName).location().toString.split(':').apply(1) + } }