Skip to content

Commit

Permalink
[spark] Support UPDATE command (apache#2268)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Nov 14, 2023
1 parent 723d580 commit bb2f7d8
Show file tree
Hide file tree
Showing 12 changed files with 619 additions and 105 deletions.
20 changes: 10 additions & 10 deletions docs/content/engines/spark3.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,18 @@ dataset.show()

## Update Table

For now, Paimon does not support `UPDATE` syntax. But we can use `INSERT INTO` syntax instead for changelog tables.

```sql
INSERT INTO my_table VALUES (1, 'Hi Again'), (3, 'Test');
{{< hint info >}}
Important table properties setting:
1. Only [primary key table]({{< ref "concepts/primary-key-table" >}}) supports this feature.
2. [MergeEngine]({{< ref "concepts/primary-key-table#merge-engines" >}}) needs to be [deduplicate]({{< ref "concepts/primary-key-table#deduplicate" >}}) or [partial-update]({{< ref "concepts/primary-key-table#partial-update" >}}) to support this feature.
{{< /hint >}}

SELECT * FROM my_table;
{{< hint warning >}}
Warning: we do not support updating primary keys.
{{< /hint >}}

/*
1 Hi Again
2 Hello
3 Test
*/
```sql
UPDATE my_table SET v = 'new_value' WHERE id = 1;
```

## Streaming Write
Expand Down
35 changes: 33 additions & 2 deletions docs/content/how-to/writing-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ For more information of drop-partition, see
## Updating tables
Currently, Paimon supports updating records by using `UPDATE` in Flink 1.17 and later versions. You can perform `UPDATE` in Flink's `batch` mode.
{{< hint info >}}
Important table properties setting:
1. Only [primary key table]({{< ref "concepts/primary-key-table" >}}) supports this feature.
Expand All @@ -345,6 +343,8 @@ Warning: we do not support updating primary keys.
{{< tab "Flink" >}}
Currently, Paimon supports updating records by using `UPDATE` in Flink 1.17 and later versions. You can perform `UPDATE` in Flink's `batch` mode.
```sql
-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
Expand All @@ -366,6 +366,37 @@ UPDATE MyTable SET b = 1, c = 2 WHERE a = 'myTable';
{{< /tab >}}
{{< tab "Spark" >}}
To enable update needs these configs below:
```text
--conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
```
spark supports update PrimitiveType and StructType, for example:
```sql
-- Syntax
UPDATE table_identifier SET column1 = value1, column2 = value2, ... WHERE condition;
CREATE TABLE T (
id INT,
s STRUCT<c1: INT, c2: STRING>,
name STRING)
TBLPROPERTIES (
'primary-key' = 'id',
'merge-engine' = 'deduplicate'
);
-- you can use
UPDATE T SET name = 'a_new' WHERE id = 1;
UPDATE T SET s.c2 = 'a_new' WHERE s.c1 = 1;
```
{{< /tab >}}
{{< /tabs >}}
## Deleting from table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.paimon.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.{CoerceArguments, PaimonAnalysis, ResolveProcedures}
import org.apache.spark.sql.catalyst.optimizer.RewriteRowLeverCommands
import org.apache.spark.sql.catalyst.analysis.{CoerceArguments, PaimonAnalysis, ResolveProcedures, RewriteRowLevelCommands}
import org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser
import org.apache.spark.sql.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
Expand All @@ -35,9 +34,7 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectResolutionRule(sparkSession => new PaimonAnalysis(sparkSession))
extensions.injectResolutionRule(spark => ResolveProcedures(spark))
extensions.injectResolutionRule(_ => CoerceArguments)

// optimizer extensions
extensions.injectOptimizerRule(_ => RewriteRowLeverCommands)
extensions.injectPostHocResolutionRule(_ => RewriteRowLevelCommands)

// table function extensions
PaimonTableValuedFunctions.supportedFnNames.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object SparkSystemColumns {
// for assigning bucket when writing
val BUCKET_COL = "_bucket_"

// for row lever operation
// for row level operation
val ROW_KIND_COL = "_row_kind_"

val SPARK_SYSTEM_COLUMNS_NAME: Seq[String] = Seq(BUCKET_COL, ROW_KIND_COL)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.paimon.CoreOptions.MergeEngine

sealed trait RowLevelOp {
val supportedMergeEngine: Seq[MergeEngine]
}

case object Delete extends RowLevelOp {
override def toString: String = "delete"

override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE)
}

case object Update extends RowLevelOp {
override def toString: String = "update"

override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.Assignment
import org.apache.spark.sql.types.StructType

trait AssignmentAlignmentHelper extends SQLConfHelper {

private lazy val resolver = conf.resolver

/**
* @param ref
* attribute reference seq, e.g. a => Seq["a"], s.c1 => Seq["s", "c1"]
* @param expr
* update expression
*/
private case class AttrUpdate(ref: Seq[String], expr: Expression)

/**
* Align update assignments to the given attrs, only supports PrimitiveType and StructType. For
* example, if attrs are [a int, b int, s struct(c1 int, c2 int)] and update assignments are [a =
* 1, s.c1 = 2], will return [1, b, struct(2, c2)].
* @param attrs
* target attrs
* @param assignments
* update assignments
* @return
* aligned update expressions
*/
protected def alignUpdateAssignments(
attrs: Seq[Attribute],
assignments: Seq[Assignment]): Seq[Expression] = {
val attrUpdates = assignments.map(a => AttrUpdate(toRefSeq(a.key), a.value))
recursiveAlignUpdates(attrs, attrUpdates)
}

def toRefSeq(expr: Expression): Seq[String] = expr match {
case attr: Attribute =>
Seq(attr.name)
case GetStructField(child, _, Some(name)) =>
toRefSeq(child) :+ name
case other =>
throw new UnsupportedOperationException(
s"Unsupported update expression: $other, only support update with PrimitiveType and StructType.")
}

private def recursiveAlignUpdates(
targetAttrs: Seq[NamedExpression],
updates: Seq[AttrUpdate],
namePrefix: Seq[String] = Nil): Seq[Expression] = {

// build aligned updated expression for each target attr
targetAttrs.map {
targetAttr =>
val headMatchedUpdates = updates.filter(u => resolver(u.ref.head, targetAttr.name))
if (headMatchedUpdates.isEmpty) {
// when no matched update, return the attr as is
targetAttr
} else {
val exactMatchedUpdate = headMatchedUpdates.find(_.ref.size == 1)
if (exactMatchedUpdate.isDefined) {
if (headMatchedUpdates.size == 1) {
// when an exact match (no nested fields) occurs, it must be the only match, then return it's expr
exactMatchedUpdate.get.expr
} else {
// otherwise, there must be conflicting updates, for example:
// - update the same attr multiple times
// - update a struct attr and its fields at the same time (e.g. s and s.c1)
val conflictingAttrNames =
headMatchedUpdates.map(u => (namePrefix ++ u.ref).mkString(".")).distinct
throw new UnsupportedOperationException(
s"Conflicting updates on attrs: ${conflictingAttrNames.mkString(", ")}"
)
}
} else {
targetAttr.dataType match {
case StructType(fields) =>
val fieldExprs = fields.zipWithIndex.map {
case (field, ordinal) =>
Alias(GetStructField(targetAttr, ordinal, Some(field.name)), field.name)()
}
val newUpdates = updates.map(u => u.copy(ref = u.ref.tail))
// process StructType's nested fields recursively
val updatedFieldExprs =
recursiveAlignUpdates(fieldExprs, newUpdates, namePrefix :+ targetAttr.name)

// build updated struct expression
CreateNamedStruct(fields.zip(updatedFieldExprs).flatMap {
case (field, expr) =>
Seq(Literal(field.name), expr)
})
case _ =>
// can't reach here
throw new UnsupportedOperationException("")
}
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.analysis

import org.apache.paimon.CoreOptions.MERGE_ENGINE
import org.apache.paimon.options.Options
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.table.Table

import org.apache.spark.sql.{AnalysisException, Delete, RowLevelOp, Update}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.SupportsDelete
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

import java.util

object RewriteRowLevelCommands extends Rule[LogicalPlan] with PredicateHelper {

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case d @ DeleteFromTable(ResolvesToPaimonTable(table), condition) =>
validateRowLevelOp(Delete, table.getTable, Option.empty)
if (canDeleteWhere(d, table, condition)) {
d
} else {
DeleteFromPaimonTableCommand(d)
}

case u @ UpdateTable(ResolvesToPaimonTable(table), assignments, _) =>
validateRowLevelOp(Update, table.getTable, Option.apply(assignments))
UpdatePaimonTableCommand(u)
}

private object ResolvesToPaimonTable {
def unapply(plan: LogicalPlan): Option[SparkTable] =
EliminateSubqueryAliases(plan) match {
case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table)
case _ => None
}
}

private def validateRowLevelOp(
op: RowLevelOp,
table: Table,
assignments: Option[Seq[Assignment]]): Unit = {
val options = Options.fromMap(table.options)
val primaryKeys = table.primaryKeys()
if (primaryKeys.isEmpty) {
throw new UnsupportedOperationException(
s"table ${table.getClass.getName} can not support $op, because there is no primary key.")
}

if (op.equals(Update) && isPrimaryKeyUpdate(primaryKeys, assignments.get)) {
throw new UnsupportedOperationException(s"$op to primary keys is not supported.")
}

val mergeEngine = options.get(MERGE_ENGINE)
if (!op.supportedMergeEngine.contains(mergeEngine)) {
throw new UnsupportedOperationException(
s"merge engine $mergeEngine can not support $op, currently only ${op.supportedMergeEngine
.mkString(", ")} can support $op.")
}
}

private def canDeleteWhere(
d: DeleteFromTable,
table: SparkTable,
condition: Expression): Boolean = {
table match {
case t: SupportsDelete if !SubqueryExpression.hasSubquery(condition) =>
// fail if any filter cannot be converted.
// correctness depends on removing all matching data.
val filters = DataSourceStrategy
.normalizeExprs(Seq(condition), d.output)
.flatMap(splitConjunctivePredicates(_).map {
f =>
DataSourceStrategy
.translateFilter(f, supportNestedPredicatePushdown = true)
.getOrElse(throw new AnalysisException(s"Exec update failed:" +
s" cannot translate expression to source filter: $f"))
})
.toArray
t.canDeleteWhere(filters)
case _ => false
}
}

private def isPrimaryKeyUpdate(
primaryKeys: util.List[String],
assignments: Seq[Assignment]): Boolean = {
assignments.exists(
a => {
a.key match {
case attr: Attribute => primaryKeys.contains(attr.name)
case _ => false
}
})
}

}
Loading

0 comments on commit bb2f7d8

Please sign in to comment.