Skip to content

Commit

Permalink
[spark] support subquery for spark update/delete (#3162)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored Apr 7, 2024
1 parent 8aaf249 commit a26b46f
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {

table.getTable match {
case paimonTable: FileStoreTable =>
val primaryKeys = paimonTable.primaryKeys().asScala
if (primaryKeys.isEmpty) {
condition.foreach(checkSubquery)
}

val relation = PaimonRelation.getPaimonRelation(d.table)
DeleteFromPaimonTableCommand(relation, paimonTable, condition.getOrElse(TrueLiteral))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {

table.getTable match {
case paimonTable: FileStoreTable =>
val primaryKeys = paimonTable.primaryKeys().asScala
if (primaryKeys.isEmpty) {
checkSubquery(condition)
}

val relation = PaimonRelation.getPaimonRelation(d.table)
DeleteFromPaimonTableCommand(relation, paimonTable, condition)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ object PaimonUpdateTable
table.getTable match {
case paimonTable: FileStoreTable =>
val primaryKeys = paimonTable.primaryKeys().asScala
if (primaryKeys.isEmpty) {
condition.foreach(checkSubquery)
}
if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) {
throw new RuntimeException("Can't update the primary key column.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ import org.apache.spark.sql.Utils.createDataset
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Not}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.lit

import java.util.{Collections, UUID}

import scala.collection.JavaConverters._
import scala.util.Try

case class DeleteFromPaimonTableCommand(
relation: DataSourceV2Relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,49 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
)
}

test("Paimon Delete: append-only table, condition contains subquery") {
test("Paimon Delete: append-only table, condition contains IN/NOT IN subquery") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED BY (dt)
|""".stripMargin)

spark.sql("""
|INSERT INTO T
|VALUES (1, 'a', '2024'), (2, 'b', '2024'),
| (3, 'c', '2025'), (4, 'd', '2025'),
| (5, 'e', '2026'), (6, 'f', '2026')
|""".stripMargin)

Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")

spark.sql("""
|DELETE FROM T
|WHERE id >= (SELECT MAX(key) FROM source)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", "2025"), (5, "e", "2026"))
.toDF()
)

// IN
spark.sql("""
|DELETE FROM T
|WHERE id IN (SELECT key FROM source)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a", "2024"), (3, "c", "2025"), (5, "e", "2026")).toDF()
)

// NOT IN: (4, 5, 6)
spark.sql("""
|DELETE FROM T
|WHERE id NOT IN (SELECT key + key % 3 FROM source)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((5, "e", "2026")).toDF()
)
}

test("Paimon Delete: append-only table, condition contains EXISTS/NOT EXISTS subquery") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED BY (dt)
|""".stripMargin)
Expand All @@ -97,9 +139,31 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
|VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')
|""".stripMargin)

Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE id IN (SELECT * FROM updated_ids)"))
.hasMessageContaining("Subqueries are not supported")
Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")

// EXISTS
spark.sql("""
|DELETE FROM T
|WHERE EXiSTS (SELECT * FROM source WHERE key > 7)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", "2025")).toDF())

// NOT EXISTS
spark.sql("""
|DELETE FROM T
|WHERE NOT EXiSTS (SELECT * FROM source WHERE key > 5)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", "2025")).toDF()
)
spark.sql("""
|DELETE FROM T
|WHERE NOT EXiSTS (SELECT * FROM source WHERE key > 7)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
spark.emptyDataFrame
)
}

CoreOptions.MergeEngine.values().foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,56 @@ class UpdateTableTest extends PaimonSparkTestBase {
|VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025')
|""".stripMargin)

Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
assertThatThrownBy(
() => spark.sql("UPDATE T set name = 'in_new' WHERE id IN (SELECT * FROM updated_ids)"))
.hasMessageContaining("Subqueries are not supported")
Seq(2, 4, 6).toDF("key").createOrReplaceTempView("source")

spark.sql("""
|UPDATE T
|SET name = concat(substring(name, 0, 1), '2')
|WHERE id < (SELECT MIN(key) FROM source)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a2", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", "2025")).toDF()
)

// EXISTS
spark.sql("""
|UPDATE T
|SET name = concat(substring(name, 0, 1), '3')
|WHERE EXiSTS (SELECT * FROM source WHERE key > 5)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a3", "2024"), (2, "b3", "2024"), (3, "c3", "2025"), (4, "d3", "2025")).toDF()
)

// NOT EXISTS
spark.sql("""
|UPDATE T
|SET name = concat(substring(name, 0, 1), '4')
|WHERE NOT EXiSTS (SELECT * FROM source WHERE key > 5)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a3", "2024"), (2, "b3", "2024"), (3, "c3", "2025"), (4, "d3", "2025")).toDF()
)

// IN
spark.sql("""
|UPDATE T
|SET name = concat(substring(name, 0, 1), '5')
|WHERE id IN (SELECT key FROM source)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a3", "2024"), (2, "b5", "2024"), (3, "c3", "2025"), (4, "d5", "2025")).toDF()
)

// NOT IN
spark.sql("""
|UPDATE T
|SET name = concat(substring(name, 0, 1), '6')
|WHERE id NOT IN (SELECT key FROM source)""".stripMargin)
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY id"),
Seq((1, "a6", "2024"), (2, "b5", "2024"), (3, "c6", "2025"), (4, "d5", "2025")).toDF()
)
}

CoreOptions.MergeEngine.values().foreach {
Expand Down

0 comments on commit a26b46f

Please sign in to comment.