Skip to content

Commit

Permalink
[spark] Support UPDATE for append table (#3129)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored Apr 1, 2024
1 parent 2ced694 commit f232725
Show file tree
Hide file tree
Showing 19 changed files with 479 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,16 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu
}

public Path bucketPath(BinaryRow partition, int bucket) {
return new Path(
root + "/" + getPartitionString(partition) + "/" + BUCKET_PATH_PREFIX + bucket);
return new Path(root + "/" + relativePartitionAndBucketPath(partition, bucket));
}

public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) {
String partitionPath = getPartitionString(partition);
if (partitionPath.isEmpty()) {
return new Path(BUCKET_PATH_PREFIX + bucket);
} else {
return new Path(getPartitionString(partition) + "/" + BUCKET_PATH_PREFIX + bucket);
}
}

/** IMPORTANT: This method is NOT THREAD SAFE. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.spark.DynamicOverWrite$;
import org.apache.paimon.spark.SparkUtils;
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper;
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils;
import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -143,9 +143,9 @@ public InternalRow[] call(InternalRow args) {
LogicalPlan relation = createRelation(tableIdent);
Expression condition = null;
if (!StringUtils.isBlank(finalWhere)) {
condition = ExpressionHelper.resolveFilter(spark(), relation, finalWhere);
condition = ExpressionUtils.resolveFilter(spark(), relation, finalWhere);
checkArgument(
ExpressionHelper.onlyHasPartitionPredicate(
ExpressionUtils.isValidPredicate(
spark(),
condition,
table.partitionKeys().toArray(new String[0])),
Expand Down Expand Up @@ -188,7 +188,7 @@ private boolean execute(
Predicate filter =
condition == null
? null
: ExpressionHelper.convertConditionToPaimonPredicate(
: ExpressionUtils.convertConditionToPaimonPredicate(
condition, relation.output(), table.rowType());
switch (bucketMode) {
case FIXED:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark

import org.apache.paimon.table.Table
import org.apache.paimon.table.source.{DataSplit, Split}

import org.apache.spark.sql.connector.read.{Batch, Scan}
import org.apache.spark.sql.types.StructType

/** For internal use only. */
case class PaimonSplitScan(table: Table, dataSplits: Array[DataSplit]) extends Scan {

override def readSchema(): StructType = SparkTypeUtils.fromPaimonRowType(table.rowType())

override def toBatch: Batch = {
PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved =>
checkPaimonTable(table)
checkPaimonTable(table.getTable)

DeleteFromPaimonTableCommand(table, d)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait PaimonMergeIntoBase
val v2Table = relation.table.asInstanceOf[SparkTable]
val targetOutput = relation.output

checkPaimonTable(v2Table)
checkPaimonTable(v2Table.getTable)
checkCondition(merge.mergeCondition)
merge.matchedActions.flatMap(_.condition).foreach(checkCondition)
merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,47 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.commands.UpdatePaimonTableCommand
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule

object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper {
import scala.collection.JavaConverters._

object PaimonUpdateTable
extends Rule[LogicalPlan]
with RowLevelHelper
with AssignmentAlignmentHelper {

override val operation: RowLevelOp = Update

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
case u @ UpdateTable(PaimonRelation(table), assignments, _) if u.resolved =>
checkPaimonTable(table)
case u @ UpdateTable(PaimonRelation(table), assignments, condition) if u.resolved =>
checkPaimonTable(table.getTable)

val primaryKeys = table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",")
if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) {
throw new RuntimeException("Can't update the primary key column.")
}
table.getTable match {
case paimonTable: FileStoreTable =>
val primaryKeys = paimonTable.primaryKeys().asScala
if (primaryKeys.isEmpty) {
condition.foreach(checkSubquery)
}
if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) {
throw new RuntimeException("Can't update the primary key column.")
}

UpdatePaimonTableCommand(u)
val relation = PaimonRelation.getPaimonRelation(u.table)
UpdatePaimonTableCommand(
relation,
paimonTable,
condition.getOrElse(TrueLiteral),
assignments)

case _ =>
throw new RuntimeException("Update Operation is only supported for FileStoreTable.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,18 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.CoreOptions.MERGE_ENGINE
import org.apache.paimon.options.Options
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.table.Table

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.Assignment

trait RowLevelHelper extends SQLConfHelper {

val operation: RowLevelOp

protected def checkPaimonTable(table: SparkTable): Unit = {
val paimonTable = if (table.getTable.primaryKeys().size() > 0) {
table.getTable
} else {
throw new UnsupportedOperationException(
s"Only support to $operation table with primary keys.")
}

val options = Options.fromMap(paimonTable.options)
val mergeEngine = options.get(MERGE_ENGINE)
if (!operation.supportedMergeEngine.contains(mergeEngine)) {
throw new UnsupportedOperationException(
s"merge engine $mergeEngine can not support $operation, currently only ${operation.supportedMergeEngine
.mkString(", ")} can support $operation.")
}
protected def checkPaimonTable(table: Table): Unit = {
operation.checkValidity(table)
}

protected def checkSubquery(condition: Expression): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,54 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.CoreOptions.{MERGE_ENGINE, MergeEngine}
import org.apache.paimon.options.Options
import org.apache.paimon.table.Table

sealed trait RowLevelOp {
val supportedMergeEngine: Seq[MergeEngine]

val name: String = this.getClass.getSimpleName.stripSuffix("$")

protected val supportedMergeEngine: Seq[MergeEngine]

protected val supportAppendOnlyTable: Boolean

def checkValidity(table: Table): Unit = {
if (!supportAppendOnlyTable && table.primaryKeys().isEmpty) {
throw new UnsupportedOperationException(s"Only support to $name table with primary keys.")
}

val mergeEngine = Options.fromMap(table.options).get(MERGE_ENGINE)
if (!supportedMergeEngine.contains(mergeEngine)) {
throw new UnsupportedOperationException(
s"merge engine $mergeEngine can not support $name, currently only ${supportedMergeEngine
.mkString(", ")} can support $name.")
}
}
}

case object Delete extends RowLevelOp {
override def toString: String = "delete"

override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE)

override val supportAppendOnlyTable: Boolean = false

}

case object Update extends RowLevelOp {
override def toString: String = "update"

override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)

override val supportAppendOnlyTable: Boolean = true

}

case object MergeInto extends RowLevelOp {
override def toString: String = "merge into"

override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)

override val supportAppendOnlyTable: Boolean = false

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.paimon.types.RowType

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -100,35 +101,37 @@ trait ExpressionHelper extends PredicateHelper {
throw new UnsupportedOperationException(
s"Unsupported update expression: $other, only support update with PrimitiveType and StructType.")
}
}

object ExpressionHelper {

case class FakeLogicalPlan(exprs: Seq[Expression], children: Seq[LogicalPlan])
extends LogicalPlan {
override def output: Seq[Attribute] = Nil

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children = newChildren)
}

def resolveFilter(spark: SparkSession, plan: LogicalPlan, where: String): Expression = {
val unResolvedExpression = spark.sessionState.sqlParser.parseExpression(where)
def resolveFilter(spark: SparkSession, plan: LogicalPlan, conditionSql: String): Expression = {
val unResolvedExpression = spark.sessionState.sqlParser.parseExpression(conditionSql)
val filter = Filter(unResolvedExpression, plan)
spark.sessionState.analyzer.execute(filter) match {
case filter: Filter => filter.condition
case _ => throw new RuntimeException(s"Could not resolve expression $where in plan: $plan")
case _ =>
throw new RuntimeException(s"Could not resolve expression $conditionSql in plan: $plan")
}
}

def onlyHasPartitionPredicate(
def isPredicatePartitionColumnsOnly(
condition: Expression,
partitionColumns: Seq[String],
resolver: Resolver
): Boolean = {
condition.references.forall(r => partitionColumns.exists(resolver(r.name, _)))
}

/**
* A valid predicate should meet two requirements: 1) This predicate only contains partition
* columns. 2) This predicate doesn't contain subquery.
*/
def isValidPredicate(
spark: SparkSession,
expr: Expression,
partitionCols: Array[String]): Boolean = {
val resolvedNameEquals = spark.sessionState.analyzer.resolver
val resolver = spark.sessionState.analyzer.resolver
splitConjunctivePredicates(expr).forall(
e =>
e.references.forall(r => partitionCols.exists(resolvedNameEquals(r.name, _))) &&
isPredicatePartitionColumnsOnly(e, partitionCols, resolver) &&
!SubqueryExpression.hasSubquery(expr))
}

Expand All @@ -148,12 +151,16 @@ object ExpressionHelper {
val predicates = filters.map(converter.convert)
PredicateBuilder.and(predicates: _*)
}
}

def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
object ExpressionHelper {

case class FakeLogicalPlan(exprs: Seq[Expression], children: Seq[LogicalPlan])
extends LogicalPlan {
override def output: Seq[Attribute] = Nil

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children = newChildren)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.analysis.expressions

/** This wrapper is only used in java code, e.g. Procedure. */
object ExpressionUtils extends ExpressionHelper
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.options.Options
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper.convertConditionToPaimonPredicate
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.paimon.spark.commands

import org.apache.paimon.spark.SparkFilterConverter
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.types.RowType

import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}

/** Helper trait for all paimon commands. */
trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {

/**
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call the `truncate`
Expand Down
Loading

0 comments on commit f232725

Please sign in to comment.