From 554ae64b7b4f01a52dfd069f2c718206975be7c4 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Fri, 22 Dec 2023 14:21:07 +0800 Subject: [PATCH] [spark] Merge Into: When Not Matched By Source (#2517) --- .../catalyst/analysis/PaimonMergeInto.scala | 56 +++++++ .../analysis/PaimonMergeIntoResolver.scala | 49 ++++++ .../catalyst/analysis/PaimonMergeInto.scala | 56 +++++++ .../analysis/PaimonMergeIntoResolver.scala | 49 ++++++ .../catalyst/analysis/PaimonMergeInto.scala | 56 +++++++ .../analysis/PaimonMergeIntoResolver.scala | 49 ++++++ .../catalyst/analysis/PaimonMergeInto.scala | 125 +++------------ .../analysis/PaimonMergeIntoBase.scala | 148 ++++++++++++++++++ .../analysis/PaimonMergeIntoResolver.scala | 69 +++----- .../PaimonMergeIntoResolverBase.scala | 102 ++++++++++++ .../spark/commands/MergeIntoPaimonTable.scala | 28 +++- .../paimon/spark/sql/MergeIntoTableTest.scala | 51 +++++- 12 files changed, 682 insertions(+), 156 deletions(-) create mode 100644 paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala create mode 100644 paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala create mode 100644 paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala create mode 100644 paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala create mode 100644 paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala create mode 100644 paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala diff --git a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala new file mode 100644 index 000000000000..a10f4e939b26 --- /dev/null +++ b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.commands.MergeIntoPaimonTable + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable} + +/** A post-hoc resolution rule for MergeInto. */ +case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase { + + override def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + targetOutput: Seq[AttributeReference]): Seq[MergeAction] = { + Seq.empty + } + + override def buildMergeIntoPaimonTable( + v2Table: SparkTable, + merge: MergeIntoTable, + alignedMatchedActions: Seq[MergeAction], + alignedNotMatchedActions: Seq[MergeAction], + alignedNotMatchedBySourceActions: Seq[MergeAction]): MergeIntoPaimonTable = { + if (alignedNotMatchedBySourceActions.nonEmpty) { + throw new RuntimeException("WHEN NOT MATCHED BY SOURCE is not supported here.") + } + + MergeIntoPaimonTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMatchedActions, + alignedNotMatchedActions, + alignedNotMatchedBySourceActions + ) + } +} diff --git a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala new file mode 100644 index 000000000000..35e2f36d4ece --- /dev/null +++ b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeAction, MergeIntoTable} + +object PaimonMergeIntoResolver extends PaimonMergeIntoResolverBase { + + def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + target: LogicalPlan, + source: LogicalPlan, + resolve: (Expression, LogicalPlan) => Expression): Seq[MergeAction] = { + Seq.empty + } + + def build( + merge: MergeIntoTable, + resolvedCond: Expression, + resolvedMatched: Seq[MergeAction], + resolvedNotMatched: Seq[MergeAction], + resolvedNotMatchedBySource: Seq[MergeAction]): MergeIntoTable = { + if (resolvedNotMatchedBySource.nonEmpty) { + throw new RuntimeException("WHEN NOT MATCHED BY SOURCE is not supported here.") + } + + merge.copy( + mergeCondition = resolvedCond, + matchedActions = resolvedMatched, + notMatchedActions = resolvedNotMatched) + } + +} diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala new file mode 100644 index 000000000000..a10f4e939b26 --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.commands.MergeIntoPaimonTable + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable} + +/** A post-hoc resolution rule for MergeInto. */ +case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase { + + override def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + targetOutput: Seq[AttributeReference]): Seq[MergeAction] = { + Seq.empty + } + + override def buildMergeIntoPaimonTable( + v2Table: SparkTable, + merge: MergeIntoTable, + alignedMatchedActions: Seq[MergeAction], + alignedNotMatchedActions: Seq[MergeAction], + alignedNotMatchedBySourceActions: Seq[MergeAction]): MergeIntoPaimonTable = { + if (alignedNotMatchedBySourceActions.nonEmpty) { + throw new RuntimeException("WHEN NOT MATCHED BY SOURCE is not supported here.") + } + + MergeIntoPaimonTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMatchedActions, + alignedNotMatchedActions, + alignedNotMatchedBySourceActions + ) + } +} diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala new file mode 100644 index 000000000000..35e2f36d4ece --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeAction, MergeIntoTable} + +object PaimonMergeIntoResolver extends PaimonMergeIntoResolverBase { + + def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + target: LogicalPlan, + source: LogicalPlan, + resolve: (Expression, LogicalPlan) => Expression): Seq[MergeAction] = { + Seq.empty + } + + def build( + merge: MergeIntoTable, + resolvedCond: Expression, + resolvedMatched: Seq[MergeAction], + resolvedNotMatched: Seq[MergeAction], + resolvedNotMatchedBySource: Seq[MergeAction]): MergeIntoTable = { + if (resolvedNotMatchedBySource.nonEmpty) { + throw new RuntimeException("WHEN NOT MATCHED BY SOURCE is not supported here.") + } + + merge.copy( + mergeCondition = resolvedCond, + matchedActions = resolvedMatched, + notMatchedActions = resolvedNotMatched) + } + +} diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala new file mode 100644 index 000000000000..a10f4e939b26 --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.commands.MergeIntoPaimonTable + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable} + +/** A post-hoc resolution rule for MergeInto. */ +case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase { + + override def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + targetOutput: Seq[AttributeReference]): Seq[MergeAction] = { + Seq.empty + } + + override def buildMergeIntoPaimonTable( + v2Table: SparkTable, + merge: MergeIntoTable, + alignedMatchedActions: Seq[MergeAction], + alignedNotMatchedActions: Seq[MergeAction], + alignedNotMatchedBySourceActions: Seq[MergeAction]): MergeIntoPaimonTable = { + if (alignedNotMatchedBySourceActions.nonEmpty) { + throw new RuntimeException("WHEN NOT MATCHED BY SOURCE is not supported here.") + } + + MergeIntoPaimonTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMatchedActions, + alignedNotMatchedActions, + alignedNotMatchedBySourceActions + ) + } +} diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala new file mode 100644 index 000000000000..35e2f36d4ece --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeAction, MergeIntoTable} + +object PaimonMergeIntoResolver extends PaimonMergeIntoResolverBase { + + def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + target: LogicalPlan, + source: LogicalPlan, + resolve: (Expression, LogicalPlan) => Expression): Seq[MergeAction] = { + Seq.empty + } + + def build( + merge: MergeIntoTable, + resolvedCond: Expression, + resolvedMatched: Seq[MergeAction], + resolvedNotMatched: Seq[MergeAction], + resolvedNotMatchedBySource: Seq[MergeAction]): MergeIntoTable = { + if (resolvedNotMatchedBySource.nonEmpty) { + throw new RuntimeException("WHEN NOT MATCHED BY SOURCE is not supported here.") + } + + merge.copy( + mergeCondition = resolvedCond, + matchedActions = resolvedMatched, + notMatchedActions = resolvedNotMatched) + } + +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala index 958f2e8c63dd..5eaa4f6dba51 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala @@ -17,117 +17,36 @@ */ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.CoreOptions import org.apache.paimon.spark.SparkTable -import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.commands.MergeIntoPaimonTable import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable} /** A post-hoc resolution rule for MergeInto. */ -case class PaimonMergeInto(spark: SparkSession) - extends Rule[LogicalPlan] - with RowLevelHelper - with ExpressionHelper - with AssignmentAlignmentHelper { +case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase { - override val operation: RowLevelOp = MergeInto - - def apply(plan: LogicalPlan): LogicalPlan = { - plan.resolveOperators { - case merge: MergeIntoTable - if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable) => - val relation = PaimonRelation.getPaimonRelation(merge.targetTable) - val v2Table = relation.table.asInstanceOf[SparkTable] - val targetOutput = relation.output - - checkPaimonTable(v2Table) - checkCondition(merge.mergeCondition) - merge.matchedActions.flatMap(_.condition).foreach(checkCondition) - merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition) - - val updateActions = merge.matchedActions.collect { case a: UpdateAction => a } - val primaryKeys = v2Table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",") - checkUpdateActionValidity( - AttributeSet(targetOutput), - merge.mergeCondition, - updateActions, - primaryKeys) - - val alignedMatchedActions = - merge.matchedActions.map(checkAndAlignActionAssignment(_, targetOutput)) - val alignedNotMatchedActions = - merge.notMatchedActions.map(checkAndAlignActionAssignment(_, targetOutput)) - - MergeIntoPaimonTable( - v2Table, - merge.targetTable, - merge.sourceTable, - merge.mergeCondition, - alignedMatchedActions, - alignedNotMatchedActions) - } + override def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + targetOutput: Seq[AttributeReference]): Seq[MergeAction] = { + merge.notMatchedBySourceActions.map(checkAndAlignActionAssignment(_, targetOutput)) } - private def checkAndAlignActionAssignment( - action: MergeAction, - targetOutput: Seq[AttributeReference]): MergeAction = { - action match { - case d @ DeleteAction(_) => d - case u @ UpdateAction(_, assignments) => - u.copy(assignments = alignAssignments(targetOutput, assignments)) - - case i @ InsertAction(_, assignments) => - if (assignments.length != targetOutput.length) { - throw new RuntimeException("Can't align the table's columns in insert clause.") - } - i.copy(assignments = alignAssignments(targetOutput, assignments)) - - case _: UpdateStarAction => - throw new RuntimeException(s"UpdateStarAction should not be here.") - - case _: InsertStarAction => - throw new RuntimeException(s"InsertStarAction should not be here.") - - case _ => - throw new RuntimeException(s"Can't recognize this action: $action") - } - } - - private def checkCondition(condition: Expression): Unit = { - if (!condition.resolved) { - throw new RuntimeException(s"Condition $condition should have been resolved.") - } - if (SubqueryExpression.hasSubquery(condition)) { - throw new RuntimeException(s"Condition $condition with subquery can't be supported.") - } - } - - /** This check will avoid to update the primary key columns */ - private def checkUpdateActionValidity( - targetOutput: AttributeSet, - mergeCondition: Expression, - actions: Seq[UpdateAction], - primaryKeys: Seq[String]): Unit = { - // Check whether there are enough `EqualTo` expressions related to all the primary keys. - lazy val isMergeConditionValid = { - val mergeExpressions = splitConjunctivePredicates(mergeCondition) - primaryKeys.forall { - primaryKey => isUpdateExpressionToPrimaryKey(targetOutput, mergeExpressions, primaryKey) - } - } - - // Check whether there are an update expression related to any primary key. - def isUpdateActionValid(action: UpdateAction): Boolean = { - validUpdateAssignment(targetOutput, primaryKeys, action.assignments) - } - - val valid = isMergeConditionValid || actions.forall(isUpdateActionValid) - if (!valid) { - throw new RuntimeException("Can't update the primary key column in update clause.") - } + override def buildMergeIntoPaimonTable( + v2Table: SparkTable, + merge: MergeIntoTable, + alignedMatchedActions: Seq[MergeAction], + alignedNotMatchedActions: Seq[MergeAction], + alignedNotMatchedBySourceActions: Seq[MergeAction]): MergeIntoPaimonTable = { + MergeIntoPaimonTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMatchedActions, + alignedNotMatchedActions, + alignedNotMatchedBySourceActions + ) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala new file mode 100644 index 000000000000..a4e80aed4b96 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.CoreOptions +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper +import org.apache.paimon.spark.commands.MergeIntoPaimonTable + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule + +trait PaimonMergeIntoBase + extends Rule[LogicalPlan] + with RowLevelHelper + with ExpressionHelper + with AssignmentAlignmentHelper { + + val spark: SparkSession + + override val operation: RowLevelOp = MergeInto + + def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperators { + case merge: MergeIntoTable + if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable) => + val relation = PaimonRelation.getPaimonRelation(merge.targetTable) + val v2Table = relation.table.asInstanceOf[SparkTable] + val targetOutput = relation.output + + checkPaimonTable(v2Table) + checkCondition(merge.mergeCondition) + merge.matchedActions.flatMap(_.condition).foreach(checkCondition) + merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition) + + val updateActions = merge.matchedActions.collect { case a: UpdateAction => a } + val primaryKeys = v2Table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",") + checkUpdateActionValidity( + AttributeSet(targetOutput), + merge.mergeCondition, + updateActions, + primaryKeys) + + val alignedMatchedActions = + merge.matchedActions.map(checkAndAlignActionAssignment(_, targetOutput)) + val alignedNotMatchedActions = + merge.notMatchedActions.map(checkAndAlignActionAssignment(_, targetOutput)) + val alignedNotMatchedBySourceActions = resolveNotMatchedBySourceActions(merge, targetOutput) + + MergeIntoPaimonTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMatchedActions, + alignedNotMatchedActions, + alignedNotMatchedBySourceActions + ) + } + } + + def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + targetOutput: Seq[AttributeReference]): Seq[MergeAction] + + def buildMergeIntoPaimonTable( + v2Table: SparkTable, + merge: MergeIntoTable, + alignedMatchedActions: Seq[MergeAction], + alignedNotMatchedActions: Seq[MergeAction], + alignedNotMatchedBySourceActions: Seq[MergeAction]): MergeIntoPaimonTable + + protected def checkAndAlignActionAssignment( + action: MergeAction, + targetOutput: Seq[AttributeReference]): MergeAction = { + action match { + case d @ DeleteAction(_) => d + case u @ UpdateAction(_, assignments) => + u.copy(assignments = alignAssignments(targetOutput, assignments)) + + case i @ InsertAction(_, assignments) => + if (assignments.length != targetOutput.length) { + throw new RuntimeException("Can't align the table's columns in insert clause.") + } + i.copy(assignments = alignAssignments(targetOutput, assignments)) + + case _: UpdateStarAction => + throw new RuntimeException(s"UpdateStarAction should not be here.") + + case _: InsertStarAction => + throw new RuntimeException(s"InsertStarAction should not be here.") + + case _ => + throw new RuntimeException(s"Can't recognize this action: $action") + } + } + + private def checkCondition(condition: Expression): Unit = { + if (!condition.resolved) { + throw new RuntimeException(s"Condition $condition should have been resolved.") + } + if (SubqueryExpression.hasSubquery(condition)) { + throw new RuntimeException(s"Condition $condition with subquery can't be supported.") + } + } + + /** This check will avoid to update the primary key columns */ + private def checkUpdateActionValidity( + targetOutput: AttributeSet, + mergeCondition: Expression, + actions: Seq[UpdateAction], + primaryKeys: Seq[String]): Unit = { + // Check whether there are enough `EqualTo` expressions related to all the primary keys. + lazy val isMergeConditionValid = { + val mergeExpressions = splitConjunctivePredicates(mergeCondition) + primaryKeys.forall { + primaryKey => isUpdateExpressionToPrimaryKey(targetOutput, mergeExpressions, primaryKey) + } + } + + // Check whether there are an update expression related to any primary key. + def isUpdateActionValid(action: UpdateAction): Boolean = { + validUpdateAssignment(targetOutput, primaryKeys, action.assignments) + } + + val valid = isMergeConditionValid || actions.forall(isUpdateActionValid) + if (!valid) { + throw new RuntimeException("Can't update the primary key column in update clause.") + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala index e6f7379e6232..19bbf1996837 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolver.scala @@ -17,72 +17,55 @@ */ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper - -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, InsertStarAction, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction, UpdateStarAction} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction, UpdateStarAction} /** Resolve all the expressions for MergeInto. */ -object PaimonMergeIntoResolver extends ExpressionHelper { - - def apply(merge: MergeIntoTable, spark: SparkSession): LogicalPlan = { - val target = merge.targetTable - val source = merge.sourceTable - assert(target.resolved, "Target should have been resolved here.") - assert(source.resolved, "Source should have been resolved here.") - - val condition = merge.mergeCondition - val matched = merge.matchedActions - val notMatched = merge.notMatchedActions - - val resolve = resolveExpression(spark) _ +object PaimonMergeIntoResolver extends PaimonMergeIntoResolverBase { + def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + target: LogicalPlan, + source: LogicalPlan, + resolve: (Expression, LogicalPlan) => Expression): Seq[MergeAction] = { def resolveMergeAction(action: MergeAction): MergeAction = { action match { case DeleteAction(condition) => - val resolvedCond = condition.map(resolve(_, merge)) + val resolvedCond = condition.map(resolve(_, target)) DeleteAction(resolvedCond) case UpdateAction(condition, assignments) => - val resolvedCond = condition.map(resolve(_, merge)) + val resolvedCond = condition.map(resolve(_, target)) val resolvedAssignments = assignments.map { assignment => assignment.copy( - key = resolve(assignment.key, merge), - value = resolve(assignment.value, merge)) + key = resolve(assignment.key, target), + value = resolve(assignment.value, target)) } UpdateAction(resolvedCond, resolvedAssignments) case UpdateStarAction(condition) => - val resolvedCond = condition.map(resolve(_, merge)) + val resolvedCond = condition.map(resolve(_, target)) val resolvedAssignments = target.output.map { attr => Assignment(attr, resolve(UnresolvedAttribute.quotedString(attr.name), source)) } UpdateAction(resolvedCond, resolvedAssignments) - case InsertAction(condition, assignments) => - val resolvedCond = condition.map(resolve(_, source)) - val resolvedAssignments = assignments.map { - assignment => - assignment.copy( - key = resolve(assignment.key, source), - value = resolve(assignment.value, source)) - } - InsertAction(resolvedCond, resolvedAssignments) - case InsertStarAction(condition) => - val resolvedCond = condition.map(resolve(_, source)) - val resolvedAssignments = target.output.map { - attr => Assignment(attr, resolve(UnresolvedAttribute.quotedString(attr.name), source)) - } - InsertAction(resolvedCond, resolvedAssignments) - case _ => - throw new RuntimeException(s"Can't recognize this action: $action") } } - val resolvedCond = resolve(condition, merge) - val resolvedMatched: Seq[MergeAction] = matched.map(resolveMergeAction) - val resolvedNotMatched: Seq[MergeAction] = notMatched.map(resolveMergeAction) + merge.notMatchedBySourceActions.map(resolveMergeAction) + } - merge.copy(target, source, resolvedCond, resolvedMatched, resolvedNotMatched) + def build( + merge: MergeIntoTable, + resolvedCond: Expression, + resolvedMatched: Seq[MergeAction], + resolvedNotMatched: Seq[MergeAction], + resolvedNotMatchedBySource: Seq[MergeAction]): MergeIntoTable = { + merge.copy( + mergeCondition = resolvedCond, + matchedActions = resolvedMatched, + notMatchedActions = resolvedNotMatched, + notMatchedBySourceActions = resolvedNotMatchedBySource) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala new file mode 100644 index 000000000000..762aaf065202 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, InsertAction, InsertStarAction, LogicalPlan, MergeAction, MergeIntoTable, UpdateAction, UpdateStarAction} + +trait PaimonMergeIntoResolverBase extends ExpressionHelper { + + def apply(merge: MergeIntoTable, spark: SparkSession): LogicalPlan = { + val target = merge.targetTable + val source = merge.sourceTable + assert(target.resolved, "Target should have been resolved here.") + assert(source.resolved, "Source should have been resolved here.") + + val condition = merge.mergeCondition + val matched = merge.matchedActions + val notMatched = merge.notMatchedActions + + val resolve: (Expression, LogicalPlan) => Expression = resolveExpression(spark) _ + + def resolveMergeAction(action: MergeAction): MergeAction = { + action match { + case DeleteAction(condition) => + val resolvedCond = condition.map(resolve(_, merge)) + DeleteAction(resolvedCond) + case UpdateAction(condition, assignments) => + val resolvedCond = condition.map(resolve(_, merge)) + val resolvedAssignments = assignments.map { + assignment => + assignment.copy( + key = resolve(assignment.key, merge), + value = resolve(assignment.value, merge)) + } + UpdateAction(resolvedCond, resolvedAssignments) + case UpdateStarAction(condition) => + val resolvedCond = condition.map(resolve(_, merge)) + val resolvedAssignments = target.output.map { + attr => Assignment(attr, resolve(UnresolvedAttribute.quotedString(attr.name), source)) + } + UpdateAction(resolvedCond, resolvedAssignments) + case InsertAction(condition, assignments) => + val resolvedCond = condition.map(resolve(_, source)) + val resolvedAssignments = assignments.map { + assignment => + assignment.copy( + key = resolve(assignment.key, source), + value = resolve(assignment.value, source)) + } + InsertAction(resolvedCond, resolvedAssignments) + case InsertStarAction(condition) => + val resolvedCond = condition.map(resolve(_, source)) + val resolvedAssignments = target.output.map { + attr => Assignment(attr, resolve(UnresolvedAttribute.quotedString(attr.name), source)) + } + InsertAction(resolvedCond, resolvedAssignments) + case _ => + throw new RuntimeException(s"Can't recognize this action: $action") + } + } + + val resolvedCond = resolve(condition, merge) + val resolvedMatched: Seq[MergeAction] = matched.map(resolveMergeAction) + val resolvedNotMatched: Seq[MergeAction] = notMatched.map(resolveMergeAction) + val resolvedNotMatchedBySource: Seq[MergeAction] = + resolveNotMatchedBySourceActions(merge, target, source, resolve) + + build(merge, resolvedCond, resolvedMatched, resolvedNotMatched, resolvedNotMatchedBySource) + } + + def resolveNotMatchedBySourceActions( + merge: MergeIntoTable, + target: LogicalPlan, + source: LogicalPlan, + resolve: (Expression, LogicalPlan) => Expression): Seq[MergeAction] + + def build( + merge: MergeIntoTable, + resolvedCond: Expression, + resolvedMatched: Seq[MergeAction], + resolvedNotMatched: Seq[MergeAction], + resolvedNotMatchedBySource: Seq[MergeAction]): MergeIntoTable +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index 4d3f43152db6..f4472f2efa85 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -44,7 +44,8 @@ case class MergeIntoPaimonTable( sourceTable: LogicalPlan, mergeCondition: Expression, matchedActions: Seq[MergeAction], - notMatchedActions: Seq[MergeAction]) + notMatchedActions: Seq[MergeAction], + notMatchedBySourceActions: Seq[MergeAction]) extends PaimonLeafRunnableCommand with WithFileStoreTable with ExpressionHelper @@ -100,6 +101,7 @@ case class MergeIntoPaimonTable( val sourceRowNotMatched = resolveOnJoinedPlan(Seq(col(TARGET_ROW_COL).isNull.expr)).head val matchedExprs = matchedActions.map(_.condition.getOrElse(TrueLiteral)) val notMatchedExprs = notMatchedActions.map(_.condition.getOrElse(TrueLiteral)) + val notMatchedBySourceExprs = notMatchedBySourceActions.map(_.condition.getOrElse(TrueLiteral)) val matchedOutputs = matchedActions.map { case UpdateAction(_, assignments) => assignments.map(_.value) :+ Literal(RowKind.UPDATE_AFTER.toByteValue) @@ -108,6 +110,14 @@ case class MergeIntoPaimonTable( case _ => throw new RuntimeException("should not be here.") } + val notMatchedBySourceOutputs = notMatchedBySourceActions.map { + case UpdateAction(_, assignments) => + assignments.map(_.value) :+ Literal(RowKind.UPDATE_AFTER.toByteValue) + case DeleteAction(_) => + targetOutput :+ Literal(RowKind.DELETE.toByteValue) + case _ => + throw new RuntimeException("should not be here.") + } val notMatchedOutputs = notMatchedActions.map { case InsertAction(_, assignments) => assignments.map(_.value) :+ Literal(RowKind.INSERT.toByteValue) @@ -126,6 +136,8 @@ case class MergeIntoPaimonTable( sourceRowNotMatched, matchedExprs, matchedOutputs, + notMatchedBySourceExprs, + notMatchedBySourceOutputs, notMatchedExprs, notMatchedOutputs, noopOutput, @@ -169,6 +181,8 @@ object MergeIntoPaimonTable { sourceRowHasNoMatch: Expression, matchedConditions: Seq[Expression], matchedOutputs: Seq[Seq[Expression]], + notMatchedBySourceConditions: Seq[Expression], + notMatchedBySourceOutputs: Seq[Seq[Expression]], notMatchedConditions: Seq[Expression], notMatchedOutputs: Seq[Seq[Expression]], noopCopyOutput: Seq[Expression], @@ -193,6 +207,8 @@ object MergeIntoPaimonTable { val sourceRowHasNoMatchPred = generatePredicate(sourceRowHasNoMatch) val matchedPreds = matchedConditions.map(generatePredicate) val matchedProjs = matchedOutputs.map(generateProjection) + val notMatchedBySourcePreds = notMatchedBySourceConditions.map(generatePredicate) + val notMatchedBySourceProjs = notMatchedBySourceOutputs.map(generateProjection) val notMatchedPreds = notMatchedConditions.map(generatePredicate) val notMatchedProjs = notMatchedOutputs.map(generateProjection) val noopCopyProj = generateProjection(noopCopyOutput) @@ -200,7 +216,15 @@ object MergeIntoPaimonTable { def processRow(inputRow: InternalRow): InternalRow = { if (targetRowHasNoMatchPred.eval(inputRow)) { - noopCopyProj.apply(inputRow) + val pair = notMatchedBySourcePreds.zip(notMatchedBySourceProjs).find { + case (predicate, _) => predicate.eval(inputRow) + } + + pair match { + case Some((_, projections)) => + projections.apply(inputRow) + case None => noopCopyProj.apply(inputRow) + } } else if (sourceRowHasNoMatchPred.eval(inputRow)) { val pair = notMatchedPreds.zip(notMatchedProjs).find { case (predicate, _) => predicate.eval(inputRow) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index 63d73a09ab75..cace728f8bfe 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -97,6 +97,33 @@ class MergeIntoTableTest extends PaimonSparkTestBase { } } + test(s"Paimon MergeInto: only not matched by source") { + withTable("source", "target") { + + Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source") + + spark.sql(s""" + |CREATE TABLE target (a INT, b INT, c STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='2') + |""".stripMargin) + spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (5, 50, 'c5')") + + spark.sql(s""" + |MERGE INTO target + |USING source + |ON target.a = source.a + |WHEN NOT MATCHED BY SOURCE AND a % 2 = 0 THEN + |UPDATE SET b = b * 10 + |WHEN NOT MATCHED BY SOURCE THEN + |DELETE + |""".stripMargin) + + checkAnswer( + spark.sql("SELECT * FROM target ORDER BY a, b"), + Row(1, 10, "c1") :: Row(2, 200, "c2") :: Nil) + } + } + test(s"Paimon MergeInto: update + insert") { withTable("source", "target") { @@ -251,13 +278,15 @@ class MergeIntoTableTest extends PaimonSparkTestBase { |ON target.a = source.a |WHEN MATCHED THEN |UPDATE SET * - |WHEN NOT MATCHED - |THEN INSERT * + |WHEN NOT MATCHED THEN + |INSERT * + |WHEN NOT MATCHED BY SOURCE THEN + |DELETE |""".stripMargin) checkAnswer( spark.sql("SELECT * FROM target ORDER BY a, b"), - Row(1, 100, "c11") :: Row(2, 20, "c2") :: Row(3, 300, "c33") :: Nil) + Row(1, 100, "c11") :: Row(3, 300, "c33") :: Nil) } } @@ -289,14 +318,18 @@ class MergeIntoTableTest extends PaimonSparkTestBase { |INSERT (a, b, c) VALUES (a, b * 1.1, c) |WHEN NOT MATCHED THEN |INSERT * + |WHEN NOT MATCHED BY SOURCE AND a = 2 THEN + |UPDATE SET b = b * 10 + |WHEN NOT MATCHED BY SOURCE THEN + |DELETE |""".stripMargin) checkAnswer( spark.sql("SELECT * FROM target ORDER BY a, b"), - Row(2, 20, "c2") :: Row(3, 300, "c33") :: Row(4, 40, "c4") :: Row(5, 550, "c5") :: Row( - 7, - 700, - "c77") :: Row(9, 990, "c99") :: Nil + Row(2, 200, "c2") :: Row(3, 300, "c33") :: Row(5, 550, "c5") :: Row(7, 700, "c77") :: Row( + 9, + 990, + "c99") :: Nil ) } } @@ -547,11 +580,13 @@ class MergeIntoTableTest extends PaimonSparkTestBase { |ON target.a = source.a |WHEN MATCHED THEN |UPDATE SET c.c1 = source.c1 + |WHEN NOT MATCHED BY SOURCE THEN + |UPDATE set c.c2 = "y2" |""".stripMargin) checkAnswer( spark.sql("SELECT * FROM target ORDER BY a"), - Row(1, 10, Row("x1", "y")) :: Row(2, 20, Row("x", "y")) :: Nil) + Row(1, 10, Row("x1", "y")) :: Row(2, 20, Row("x", "y2")) :: Nil) } }