Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Jan 21, 2025
1 parent dbd129d commit 643ff6c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.paimon.spark.SparkTable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

import scala.util.control.NonFatal
Expand All @@ -32,6 +32,7 @@ object PaimonRelation extends Logging {

def unapply(plan: LogicalPlan): Option[SparkTable] =
EliminateSubqueryAliases(plan) match {
case Project(_, DataSourceV2Relation(table: SparkTable, _, _, _, _)) => Some(table)
case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table)
case ResolvedTable(_, _, table: SparkTable, _) => Some(table)
case _ => None
Expand All @@ -49,6 +50,7 @@ object PaimonRelation extends Logging {

def getPaimonRelation(plan: LogicalPlan): DataSourceV2Relation = {
EliminateSubqueryAliases(plan) match {
case Project(_, d @ DataSourceV2Relation(_: SparkTable, _, _, _, _)) => d
case d @ DataSourceV2Relation(_: SparkTable, _, _, _, _) => d
case _ => throw new RuntimeException(s"It's not a paimon table, $plan")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.analysis.Update

import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}

abstract class UpdateTableTestBase extends PaimonSparkTestBase {
Expand Down Expand Up @@ -349,4 +350,11 @@ abstract class UpdateTableTestBase extends PaimonSparkTestBase {
() => spark.sql("UPDATE T SET s.c2 = 'a_new', s = struct(11, 'a_new') WHERE s.c1 = 1"))
.hasMessageContaining("Conflicting update/insert on attrs: s.c2, s")
}

test("Paimon update: update table with char type") {
sql("CREATE TABLE T (id INT, s STRING, c CHAR(1))")
sql("INSERT INTO T VALUES (1, 's', 'a')")
sql("UPDATE T SET c = 'b' WHERE id = 1")
checkAnswer(sql("SELECT * FROM T"), Seq(Row(1, "s", "b")))
}
}

0 comments on commit 643ff6c

Please sign in to comment.