Skip to content

Commit

Permalink
[test][spark] Add insert with column list test case (#4654)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Dec 6, 2024
1 parent 315f8c0 commit 69bdfe0
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 12 deletions.
39 changes: 27 additions & 12 deletions docs/content/spark/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,54 @@ under the License.

# SQL Write

## Syntax
## Insert Table

The `INSERT` statement inserts new rows into a table or overwrites the existing data in the table. The inserted rows can be specified by value expressions or result from a query.

**Syntax**

```sql
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query };
```
**Parameters**

- **table_identifier**: Specifies a table name, which may be optionally qualified with a database name.

- **part_spec**: An optional parameter that specifies a comma-separated list of key and value pairs for partitions.

For more information, please check the syntax document:
- **column_list**: An optional parameter that specifies a comma-separated list of columns belonging to the table_identifier table. Spark will reorder the columns of the input query to match the table schema according to the specified column list.

[Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)
Note: Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, column_list's size must be equal to the target table's column size, otherwise these commands would have failed.

## INSERT INTO
- **value_expr** ( { value | NULL } [ , … ] ) [ , ( … ) ]: Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows.

For more information, please check the syntax document: [Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)

### Insert Into

Use `INSERT INTO` to apply records and changes to tables.

```sql
INSERT INTO my_table SELECT ...
```

## Overwriting the Whole Table
### Insert Overwrite

Use `INSERT OVERWRITE` to overwrite the whole unpartitioned table.
Use `INSERT OVERWRITE` to overwrite the whole table.

```sql
INSERT OVERWRITE my_table SELECT ...
```

### Overwriting a Partition
#### Insert Overwrite Partition

Use `INSERT OVERWRITE` to overwrite a partition.

```sql
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
```

### Dynamic Overwrite
#### Dynamic Overwrite Partition

Spark's default overwrite mode is `static` partition overwrite. To enable dynamic overwritten you need to set the Spark session configuration `spark.sql.sources.partitionOverwriteMode` to `dynamic`

Expand Down Expand Up @@ -97,13 +110,15 @@ SELECT * FROM my_table;
*/
```

## Truncate tables
## Truncate Table

The `TRUNCATE TABLE` statement removes all the rows from a table or partition(s).

```sql
TRUNCATE TABLE my_table;
```

## Updating tables
## Update Table

spark supports update PrimitiveType and StructType, for example:

Expand All @@ -125,13 +140,13 @@ UPDATE t SET name = 'a_new' WHERE id = 1;
UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;
```

## Deleting from table
## Delete From Table

```sql
DELETE FROM my_table WHERE currency = 'UNKNOWN';
```

## Merging into table
## Merge Into Table

Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,4 +508,56 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase {
) :: Nil
)
}

test("Paimon Insert: insert with column list") {
sql("CREATE TABLE T (name String, student_id INT) PARTITIONED BY (address STRING)")

// insert with a column list
sql("INSERT INTO T (name, student_id, address) VALUES ('a', '1', 'Hangzhou')")
// Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target
// table will automatically add the corresponding default values for the remaining columns (or NULL for any column
// lacking an explicitly-assigned default value). In Spark 3.3 or earlier, these commands would have failed.
// See https://issues.apache.org/jira/browse/SPARK-42521
if (gteqSpark3_4) {
sql("INSERT INTO T (name) VALUES ('b')")
sql("INSERT INTO T (address, name) VALUES ('Hangzhou', 'c')")
} else {
sql("INSERT INTO T (name, student_id, address) VALUES ('b', null, null)")
sql("INSERT INTO T (name, student_id, address) VALUES ('c', null, 'Hangzhou')")
}

// insert with both a partition spec and a column list
if (gteqSpark3_4) {
sql("INSERT INTO T PARTITION (address='Beijing') (name) VALUES ('d')")
} else {
sql("INSERT INTO T PARTITION (address='Beijing') (name, student_id) VALUES ('d', null)")
}
sql("INSERT INTO T PARTITION (address='Hangzhou') (student_id, name) VALUES (5, 'e')")

checkAnswer(
sql("SELECT * FROM T ORDER BY name"),
Seq(
Row("a", 1, "Hangzhou"),
Row("b", null, null),
Row("c", null, "Hangzhou"),
Row("d", null, "Beijing"),
Row("e", 5, "Hangzhou"))
)

// insert overwrite with a column list
if (gteqSpark3_4) {
sql("INSERT OVERWRITE T (name, address) VALUES ('f', 'Shanghai')")
} else {
sql("INSERT OVERWRITE T (name, student_id, address) VALUES ('f', null, 'Shanghai')")
}
checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("f", null, "Shanghai"))

// insert overwrite with both a partition spec and a column list
if (gteqSpark3_4) {
sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name) VALUES ('g')")
} else {
sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name, student_id) VALUES ('g', null)")
}
checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("g", null, "Shanghai"))
}
}

0 comments on commit 69bdfe0

Please sign in to comment.