Skip to content

Commit

Permalink
[spark] update for append table
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Apr 1, 2024
1 parent 5b2ed7b commit d87f89c
Show file tree
Hide file tree
Showing 17 changed files with 483 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.spark.DynamicOverWrite$;
import org.apache.paimon.spark.SparkUtils;
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper;
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils;
import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -143,9 +143,9 @@ public InternalRow[] call(InternalRow args) {
LogicalPlan relation = createRelation(tableIdent);
Expression condition = null;
if (!StringUtils.isBlank(finalWhere)) {
condition = ExpressionHelper.resolveFilter(spark(), relation, finalWhere);
condition = ExpressionUtils.resolveFilter(spark(), relation, finalWhere);
checkArgument(
ExpressionHelper.onlyHasPartitionPredicate(
ExpressionUtils.isValidPredicate(
spark(),
condition,
table.partitionKeys().toArray(new String[0])),
Expand Down Expand Up @@ -188,7 +188,7 @@ private boolean execute(
Predicate filter =
condition == null
? null
: ExpressionHelper.convertConditionToPaimonPredicate(
: ExpressionUtils.convertConditionToPaimonPredicate(
condition, relation.output(), table.rowType());
switch (bucketMode) {
case FIXED:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark

import org.apache.paimon.table.Table
import org.apache.paimon.table.source.{DataSplit, Split}

import org.apache.spark.sql.connector.read.{Batch, Scan}
import org.apache.spark.sql.types.StructType

/** For internal use only. */
case class PaimonSplitScan(table: Table, dataSplits: Array[DataSplit]) extends Scan {

override def readSchema(): StructType = SparkTypeUtils.fromPaimonRowType(table.rowType())

override def toBatch: Batch = {
PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved =>
checkPaimonTable(table)
checkPaimonTable(table.getTable)

DeleteFromPaimonTableCommand(table, d)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait PaimonMergeIntoBase
val v2Table = relation.table.asInstanceOf[SparkTable]
val targetOutput = relation.output

checkPaimonTable(v2Table)
checkPaimonTable(v2Table.getTable)
checkCondition(merge.mergeCondition)
merge.matchedActions.flatMap(_.condition).foreach(checkCondition)
merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,47 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.commands.UpdatePaimonTableCommand
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule

object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper {
import scala.collection.JavaConverters._

object PaimonUpdateTable
extends Rule[LogicalPlan]
with RowLevelHelper
with AssignmentAlignmentHelper {

override val operation: RowLevelOp = Update

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
case u @ UpdateTable(PaimonRelation(table), assignments, _) if u.resolved =>
checkPaimonTable(table)
case u @ UpdateTable(PaimonRelation(table), assignments, condition) if u.resolved =>
checkPaimonTable(table.getTable)

val primaryKeys = table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",")
if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) {
throw new RuntimeException("Can't update the primary key column.")
}
table.getTable match {
case paimonTable: FileStoreTable =>
val primaryKeys = paimonTable.primaryKeys().asScala
if (primaryKeys.isEmpty) {
condition.foreach(checkSubquery)
}
if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) {
throw new RuntimeException("Can't update the primary key column.")
}

UpdatePaimonTableCommand(u)
val relation = PaimonRelation.getPaimonRelation(u.table)
UpdatePaimonTableCommand(
relation,
paimonTable,
condition.getOrElse(TrueLiteral),
assignments)

case _ =>
throw new RuntimeException("Update Operation is only supported for FileStoreTable.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,18 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.CoreOptions.MERGE_ENGINE
import org.apache.paimon.options.Options
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.table.Table

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.Assignment

trait RowLevelHelper extends SQLConfHelper {

val operation: RowLevelOp

protected def checkPaimonTable(table: SparkTable): Unit = {
val paimonTable = if (table.getTable.primaryKeys().size() > 0) {
table.getTable
} else {
throw new UnsupportedOperationException(
s"Only support to $operation table with primary keys.")
}

val options = Options.fromMap(paimonTable.options)
val mergeEngine = options.get(MERGE_ENGINE)
if (!operation.supportedMergeEngine.contains(mergeEngine)) {
throw new UnsupportedOperationException(
s"merge engine $mergeEngine can not support $operation, currently only ${operation.supportedMergeEngine
.mkString(", ")} can support $operation.")
}
protected def checkPaimonTable(table: Table): Unit = {
operation.checkValidity(table)
}

protected def checkSubquery(condition: Expression): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,63 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.AppendOnlyFileStore
import org.apache.paimon.CoreOptions.{MERGE_ENGINE, MergeEngine}
import org.apache.paimon.options.Options
import org.apache.paimon.spark.catalyst.analysis.RowLevelOp.{AppendTable, PrimaryKeyTable}
import org.apache.paimon.table.Table

sealed trait RowLevelOp {
val supportedMergeEngine: Seq[MergeEngine]

val name: String = this.getClass.getSimpleName

protected val supportedMergeEngine: Seq[MergeEngine]

protected val supportedTable: Seq[String]

def checkValidity(table: Table): Unit = {
if (!supportedTable.contains(table.getClass.getSimpleName)) {
throw new UnsupportedOperationException(s"Only support to $name table with primary keys.")
}

val mergeEngine = Options.fromMap(table.options).get(MERGE_ENGINE)
if (!supportedMergeEngine.contains(mergeEngine)) {
throw new UnsupportedOperationException(
s"merge engine $mergeEngine can not support $name, currently only ${supportedMergeEngine
.mkString(", ")} can support $name.")
}
}
}

case object Delete extends RowLevelOp {
override def toString: String = "delete"

override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE)

override val supportedTable: Seq[String] = Seq(PrimaryKeyTable)

}

case object Update extends RowLevelOp {
override def toString: String = "update"

override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)

override val supportedTable: Seq[String] = Seq(PrimaryKeyTable, AppendTable)

}

case object MergeInto extends RowLevelOp {
override def toString: String = "merge into"

override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)

override val supportedTable: Seq[String] = Seq(PrimaryKeyTable)

}

object RowLevelOp {

val AppendTable = "AppendOnlyFileStoreTable"
val PrimaryKeyTable = "PrimaryKeyFileStoreTable"

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.paimon.types.RowType

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -100,35 +101,37 @@ trait ExpressionHelper extends PredicateHelper {
throw new UnsupportedOperationException(
s"Unsupported update expression: $other, only support update with PrimitiveType and StructType.")
}
}

object ExpressionHelper {

case class FakeLogicalPlan(exprs: Seq[Expression], children: Seq[LogicalPlan])
extends LogicalPlan {
override def output: Seq[Attribute] = Nil

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children = newChildren)
}

def resolveFilter(spark: SparkSession, plan: LogicalPlan, where: String): Expression = {
val unResolvedExpression = spark.sessionState.sqlParser.parseExpression(where)
def resolveFilter(spark: SparkSession, plan: LogicalPlan, conditionSql: String): Expression = {
val unResolvedExpression = spark.sessionState.sqlParser.parseExpression(conditionSql)
val filter = Filter(unResolvedExpression, plan)
spark.sessionState.analyzer.execute(filter) match {
case filter: Filter => filter.condition
case _ => throw new RuntimeException(s"Could not resolve expression $where in plan: $plan")
case _ =>
throw new RuntimeException(s"Could not resolve expression $conditionSql in plan: $plan")
}
}

def onlyHasPartitionPredicate(
def isPredicatePartitionColumnsOnly(
condition: Expression,
partitionColumns: Seq[String],
resolver: Resolver
): Boolean = {
condition.references.forall(r => partitionColumns.exists(resolver(r.name, _)))
}

/**
* A valid predicate should meet two requirements: 1) This predicate only contains partition
* columns. 2) This predicate doesn't contain subquery.
*/
def isValidPredicate(
spark: SparkSession,
expr: Expression,
partitionCols: Array[String]): Boolean = {
val resolvedNameEquals = spark.sessionState.analyzer.resolver
val resolver = spark.sessionState.analyzer.resolver
splitConjunctivePredicates(expr).forall(
e =>
e.references.forall(r => partitionCols.exists(resolvedNameEquals(r.name, _))) &&
isPredicatePartitionColumnsOnly(e, partitionCols, resolver) &&
!SubqueryExpression.hasSubquery(expr))
}

Expand All @@ -148,12 +151,16 @@ object ExpressionHelper {
val predicates = filters.map(converter.convert)
PredicateBuilder.and(predicates: _*)
}
}

def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
object ExpressionHelper {

case class FakeLogicalPlan(exprs: Seq[Expression], children: Seq[LogicalPlan])
extends LogicalPlan {
override def output: Seq[Attribute] = Nil

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children = newChildren)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.analysis.expressions

/** This wrapper is only used in java code, e.g. Procedure. */
object ExpressionUtils extends ExpressionHelper
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.options.Options
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper.convertConditionToPaimonPredicate
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.paimon.spark.commands

import org.apache.paimon.spark.SparkFilterConverter
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.types.RowType

import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}

/** Helper trait for all paimon commands. */
trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {

/**
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call the `truncate`
Expand Down
Loading

0 comments on commit d87f89c

Please sign in to comment.