From 69bdfe089089e3c718a0df57662f7996d778f75d Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Sat, 7 Dec 2024 00:15:37 +0800 Subject: [PATCH] [test][spark] Add insert with column list test case (#4654) --- docs/content/spark/sql-write.md | 39 +++++++++----- .../sql/InsertOverwriteTableTestBase.scala | 52 +++++++++++++++++++ 2 files changed, 79 insertions(+), 12 deletions(-) diff --git a/docs/content/spark/sql-write.md b/docs/content/spark/sql-write.md index d2777110914f..5f4fa2dabc9f 100644 --- a/docs/content/spark/sql-write.md +++ b/docs/content/spark/sql-write.md @@ -26,17 +26,30 @@ under the License. # SQL Write -## Syntax +## Insert Table + +The `INSERT` statement inserts new rows into a table or overwrites the existing data in the table. The inserted rows can be specified by value expressions or result from a query. + +**Syntax** ```sql INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }; ``` +**Parameters** + +- **table_identifier**: Specifies a table name, which may be optionally qualified with a database name. + +- **part_spec**: An optional parameter that specifies a comma-separated list of key and value pairs for partitions. -For more information, please check the syntax document: +- **column_list**: An optional parameter that specifies a comma-separated list of columns belonging to the table_identifier table. Spark will reorder the columns of the input query to match the table schema according to the specified column list. -[Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html) + Note: Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, column_list's size must be equal to the target table's column size, otherwise these commands would have failed. -## INSERT INTO +- **value_expr** ( { value | NULL } [ , … ] ) [ , ( … ) ]: Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows. + +For more information, please check the syntax document: [Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html) + +### Insert Into Use `INSERT INTO` to apply records and changes to tables. @@ -44,15 +57,15 @@ Use `INSERT INTO` to apply records and changes to tables. INSERT INTO my_table SELECT ... ``` -## Overwriting the Whole Table +### Insert Overwrite -Use `INSERT OVERWRITE` to overwrite the whole unpartitioned table. +Use `INSERT OVERWRITE` to overwrite the whole table. ```sql INSERT OVERWRITE my_table SELECT ... ``` -### Overwriting a Partition +#### Insert Overwrite Partition Use `INSERT OVERWRITE` to overwrite a partition. @@ -60,7 +73,7 @@ Use `INSERT OVERWRITE` to overwrite a partition. INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ... ``` -### Dynamic Overwrite +#### Dynamic Overwrite Partition Spark's default overwrite mode is `static` partition overwrite. To enable dynamic overwritten you need to set the Spark session configuration `spark.sql.sources.partitionOverwriteMode` to `dynamic` @@ -97,13 +110,15 @@ SELECT * FROM my_table; */ ``` -## Truncate tables +## Truncate Table + +The `TRUNCATE TABLE` statement removes all the rows from a table or partition(s). ```sql TRUNCATE TABLE my_table; ``` -## Updating tables +## Update Table spark supports update PrimitiveType and StructType, for example: @@ -125,13 +140,13 @@ UPDATE t SET name = 'a_new' WHERE id = 1; UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1; ``` -## Deleting from table +## Delete From Table ```sql DELETE FROM my_table WHERE currency = 'UNKNOWN'; ``` -## Merging into table +## Merge Into Table Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit. diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala index 03026e857429..977b74707069 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala @@ -508,4 +508,56 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase { ) :: Nil ) } + + test("Paimon Insert: insert with column list") { + sql("CREATE TABLE T (name String, student_id INT) PARTITIONED BY (address STRING)") + + // insert with a column list + sql("INSERT INTO T (name, student_id, address) VALUES ('a', '1', 'Hangzhou')") + // Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target + // table will automatically add the corresponding default values for the remaining columns (or NULL for any column + // lacking an explicitly-assigned default value). In Spark 3.3 or earlier, these commands would have failed. + // See https://issues.apache.org/jira/browse/SPARK-42521 + if (gteqSpark3_4) { + sql("INSERT INTO T (name) VALUES ('b')") + sql("INSERT INTO T (address, name) VALUES ('Hangzhou', 'c')") + } else { + sql("INSERT INTO T (name, student_id, address) VALUES ('b', null, null)") + sql("INSERT INTO T (name, student_id, address) VALUES ('c', null, 'Hangzhou')") + } + + // insert with both a partition spec and a column list + if (gteqSpark3_4) { + sql("INSERT INTO T PARTITION (address='Beijing') (name) VALUES ('d')") + } else { + sql("INSERT INTO T PARTITION (address='Beijing') (name, student_id) VALUES ('d', null)") + } + sql("INSERT INTO T PARTITION (address='Hangzhou') (student_id, name) VALUES (5, 'e')") + + checkAnswer( + sql("SELECT * FROM T ORDER BY name"), + Seq( + Row("a", 1, "Hangzhou"), + Row("b", null, null), + Row("c", null, "Hangzhou"), + Row("d", null, "Beijing"), + Row("e", 5, "Hangzhou")) + ) + + // insert overwrite with a column list + if (gteqSpark3_4) { + sql("INSERT OVERWRITE T (name, address) VALUES ('f', 'Shanghai')") + } else { + sql("INSERT OVERWRITE T (name, student_id, address) VALUES ('f', null, 'Shanghai')") + } + checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("f", null, "Shanghai")) + + // insert overwrite with both a partition spec and a column list + if (gteqSpark3_4) { + sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name) VALUES ('g')") + } else { + sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name, student_id) VALUES ('g', null)") + } + checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("g", null, "Shanghai")) + } }