Skip to content

Commit

Permalink
[spark] Fix merge into update on source eq target condition (apache#3665
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Zouxxyy authored Jul 3, 2024
1 parent a08d7f3 commit 6573930
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ trait RowLevelHelper extends SQLConfHelper {
case Assignment(key, value) => key == value
}
.exists {
case EqualTo(left: AttributeReference, _) =>
isTargetPrimaryKey(left)
case EqualTo(left: AttributeReference, right: AttributeReference) =>
isTargetPrimaryKey(left) || isTargetPrimaryKey(right)
case Assignment(key: AttributeReference, _) =>
isTargetPrimaryKey(key)
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,4 +621,28 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase {
assert(error.contains("Only support to MergeInto table with primary keys."))
}
}

test(s"Paimon MergeInto: update on source eq target condition") {
withTable("source", "target") {
Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source")

sql(s"""
|CREATE TABLE target (a INT, b INT, c STRING)
|TBLPROPERTIES ('primary-key'='a')
|""".stripMargin)
sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")

sql(s"""
|MERGE INTO target
|USING source
|ON source.a = target.a
|WHEN MATCHED THEN
|UPDATE SET a = source.a, b = source.b, c = source.c
|""".stripMargin)

checkAnswer(
sql("SELECT * FROM target ORDER BY a, b"),
Row(1, 100, "c11") :: Row(2, 20, "c2") :: Nil)
}
}
}

0 comments on commit 6573930

Please sign in to comment.