From f2327257bb1410c7900ea01130052c7773e86614 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Mon, 1 Apr 2024 20:41:59 +0800 Subject: [PATCH] [spark] Support UPDATE for append table (#3129) --- .../paimon/utils/FileStorePathFactory.java | 12 +- .../spark/procedure/CompactProcedure.java | 8 +- .../apache/paimon/spark/PaimonSplitScan.scala | 36 ++++ .../catalyst/analysis/PaimonDeleteTable.scala | 2 +- .../analysis/PaimonMergeIntoBase.scala | 2 +- .../catalyst/analysis/PaimonUpdateTable.scala | 38 +++- .../catalyst/analysis/RowLevelHelper.scala | 23 +-- .../spark/catalyst/analysis/RowLevelOp.scala | 36 +++- .../expressions/ExpressionHelper.scala | 53 +++--- .../expressions/ExpressionUtils.scala | 22 +++ .../DeleteFromPaimonTableCommand.scala | 1 - .../paimon/spark/commands/PaimonCommand.scala | 4 +- .../spark/commands/PaimonSparkWriter.scala | 17 +- .../spark/commands/SparkDataFileMeta.scala | 64 +++++++ .../commands/UpdatePaimonTableCommand.scala | 168 +++++++++++++++--- .../spark/commands/WithFileStoreTable.scala | 4 + .../spark/commands/WriteIntoPaimonTable.scala | 17 +- .../spark/sql/MergeIntoTableTestBase.scala | 2 +- .../paimon/spark/sql/UpdateTableTest.scala | 79 +++++++- 19 files changed, 479 insertions(+), 109 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java index 903a814e13e2..0f3ad7fec72b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java @@ -101,8 +101,16 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu } public Path bucketPath(BinaryRow partition, int bucket) { - return new Path( - root + "/" + getPartitionString(partition) + "/" + BUCKET_PATH_PREFIX + bucket); + return new Path(root + "/" + relativePartitionAndBucketPath(partition, bucket)); + } + + public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) { + String partitionPath = getPartitionString(partition); + if (partitionPath.isEmpty()) { + return new Path(BUCKET_PATH_PREFIX + bucket); + } else { + return new Path(getPartitionString(partition) + "/" + BUCKET_PATH_PREFIX + bucket); + } } /** IMPORTANT: This method is NOT THREAD SAFE. */ diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 111e9f75ca2b..6396336442c8 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -28,7 +28,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.spark.DynamicOverWrite$; import org.apache.paimon.spark.SparkUtils; -import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper; +import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils; import org.apache.paimon.spark.commands.WriteIntoPaimonTable; import org.apache.paimon.spark.sort.TableSorter; import org.apache.paimon.table.BucketMode; @@ -143,9 +143,9 @@ public InternalRow[] call(InternalRow args) { LogicalPlan relation = createRelation(tableIdent); Expression condition = null; if (!StringUtils.isBlank(finalWhere)) { - condition = ExpressionHelper.resolveFilter(spark(), relation, finalWhere); + condition = ExpressionUtils.resolveFilter(spark(), relation, finalWhere); checkArgument( - ExpressionHelper.onlyHasPartitionPredicate( + ExpressionUtils.isValidPredicate( spark(), condition, table.partitionKeys().toArray(new String[0])), @@ -188,7 +188,7 @@ private boolean execute( Predicate filter = condition == null ? null - : ExpressionHelper.convertConditionToPaimonPredicate( + : ExpressionUtils.convertConditionToPaimonPredicate( condition, relation.output(), table.rowType()); switch (bucketMode) { case FIXED: diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala new file mode 100644 index 000000000000..e86f4caf64c9 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.paimon.table.Table +import org.apache.paimon.table.source.{DataSplit, Split} + +import org.apache.spark.sql.connector.read.{Batch, Scan} +import org.apache.spark.sql.types.StructType + +/** For internal use only. */ +case class PaimonSplitScan(table: Table, dataSplits: Array[DataSplit]) extends Scan { + + override def readSchema(): StructType = SparkTypeUtils.fromPaimonRowType(table.rowType()) + + override def toBatch: Batch = { + PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder) + } + +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala index 8ef9de1cc374..f2800d742556 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala @@ -30,7 +30,7 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper { override def apply(plan: LogicalPlan): LogicalPlan = { plan.resolveOperators { case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved => - checkPaimonTable(table) + checkPaimonTable(table.getTable) DeleteFromPaimonTableCommand(table, d) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala index f95185699ac4..c07b58399883 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -46,7 +46,7 @@ trait PaimonMergeIntoBase val v2Table = relation.table.asInstanceOf[SparkTable] val targetOutput = relation.output - checkPaimonTable(v2Table) + checkPaimonTable(v2Table.getTable) checkCondition(merge.mergeCondition) merge.matchedActions.flatMap(_.condition).foreach(checkCondition) merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala index 940cb0e14cc6..e369c46e2e56 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala @@ -18,27 +18,47 @@ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.CoreOptions import org.apache.paimon.spark.commands.UpdatePaimonTableCommand +import org.apache.paimon.table.FileStoreTable +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule -object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper { +import scala.collection.JavaConverters._ + +object PaimonUpdateTable + extends Rule[LogicalPlan] + with RowLevelHelper + with AssignmentAlignmentHelper { override val operation: RowLevelOp = Update override def apply(plan: LogicalPlan): LogicalPlan = { plan.resolveOperators { - case u @ UpdateTable(PaimonRelation(table), assignments, _) if u.resolved => - checkPaimonTable(table) + case u @ UpdateTable(PaimonRelation(table), assignments, condition) if u.resolved => + checkPaimonTable(table.getTable) - val primaryKeys = table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",") - if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) { - throw new RuntimeException("Can't update the primary key column.") - } + table.getTable match { + case paimonTable: FileStoreTable => + val primaryKeys = paimonTable.primaryKeys().asScala + if (primaryKeys.isEmpty) { + condition.foreach(checkSubquery) + } + if (!validUpdateAssignment(u.table.outputSet, primaryKeys, assignments)) { + throw new RuntimeException("Can't update the primary key column.") + } - UpdatePaimonTableCommand(u) + val relation = PaimonRelation.getPaimonRelation(u.table) + UpdatePaimonTableCommand( + relation, + paimonTable, + condition.getOrElse(TrueLiteral), + assignments) + + case _ => + throw new RuntimeException("Update Operation is only supported for FileStoreTable.") + } } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala index 659d84dabae0..9981e7d3cc7c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala @@ -18,33 +18,18 @@ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.CoreOptions.MERGE_ENGINE -import org.apache.paimon.options.Options -import org.apache.paimon.spark.SparkTable +import org.apache.paimon.table.Table import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.Assignment trait RowLevelHelper extends SQLConfHelper { val operation: RowLevelOp - protected def checkPaimonTable(table: SparkTable): Unit = { - val paimonTable = if (table.getTable.primaryKeys().size() > 0) { - table.getTable - } else { - throw new UnsupportedOperationException( - s"Only support to $operation table with primary keys.") - } - - val options = Options.fromMap(paimonTable.options) - val mergeEngine = options.get(MERGE_ENGINE) - if (!operation.supportedMergeEngine.contains(mergeEngine)) { - throw new UnsupportedOperationException( - s"merge engine $mergeEngine can not support $operation, currently only ${operation.supportedMergeEngine - .mkString(", ")} can support $operation.") - } + protected def checkPaimonTable(table: Table): Unit = { + operation.checkValidity(table) } protected def checkSubquery(condition: Expression): Unit = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala index 3d4fe088920a..f83cf91d86e2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala @@ -18,28 +18,54 @@ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.CoreOptions.MergeEngine +import org.apache.paimon.CoreOptions.{MERGE_ENGINE, MergeEngine} +import org.apache.paimon.options.Options +import org.apache.paimon.table.Table sealed trait RowLevelOp { - val supportedMergeEngine: Seq[MergeEngine] + + val name: String = this.getClass.getSimpleName.stripSuffix("$") + + protected val supportedMergeEngine: Seq[MergeEngine] + + protected val supportAppendOnlyTable: Boolean + + def checkValidity(table: Table): Unit = { + if (!supportAppendOnlyTable && table.primaryKeys().isEmpty) { + throw new UnsupportedOperationException(s"Only support to $name table with primary keys.") + } + + val mergeEngine = Options.fromMap(table.options).get(MERGE_ENGINE) + if (!supportedMergeEngine.contains(mergeEngine)) { + throw new UnsupportedOperationException( + s"merge engine $mergeEngine can not support $name, currently only ${supportedMergeEngine + .mkString(", ")} can support $name.") + } + } } case object Delete extends RowLevelOp { - override def toString: String = "delete" override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE) + + override val supportAppendOnlyTable: Boolean = false + } case object Update extends RowLevelOp { - override def toString: String = "update" override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE) + + override val supportAppendOnlyTable: Boolean = true + } case object MergeInto extends RowLevelOp { - override def toString: String = "merge into" override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE) + + override val supportAppendOnlyTable: Boolean = false + } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala index 65f2a04bde53..3e09557d564b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala @@ -24,6 +24,7 @@ import org.apache.paimon.types.RowType import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter} +import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.internal.SQLConf @@ -100,35 +101,37 @@ trait ExpressionHelper extends PredicateHelper { throw new UnsupportedOperationException( s"Unsupported update expression: $other, only support update with PrimitiveType and StructType.") } -} - -object ExpressionHelper { - - case class FakeLogicalPlan(exprs: Seq[Expression], children: Seq[LogicalPlan]) - extends LogicalPlan { - override def output: Seq[Attribute] = Nil - override protected def withNewChildrenInternal( - newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children = newChildren) - } - - def resolveFilter(spark: SparkSession, plan: LogicalPlan, where: String): Expression = { - val unResolvedExpression = spark.sessionState.sqlParser.parseExpression(where) + def resolveFilter(spark: SparkSession, plan: LogicalPlan, conditionSql: String): Expression = { + val unResolvedExpression = spark.sessionState.sqlParser.parseExpression(conditionSql) val filter = Filter(unResolvedExpression, plan) spark.sessionState.analyzer.execute(filter) match { case filter: Filter => filter.condition - case _ => throw new RuntimeException(s"Could not resolve expression $where in plan: $plan") + case _ => + throw new RuntimeException(s"Could not resolve expression $conditionSql in plan: $plan") } } - def onlyHasPartitionPredicate( + def isPredicatePartitionColumnsOnly( + condition: Expression, + partitionColumns: Seq[String], + resolver: Resolver + ): Boolean = { + condition.references.forall(r => partitionColumns.exists(resolver(r.name, _))) + } + + /** + * A valid predicate should meet two requirements: 1) This predicate only contains partition + * columns. 2) This predicate doesn't contain subquery. + */ + def isValidPredicate( spark: SparkSession, expr: Expression, partitionCols: Array[String]): Boolean = { - val resolvedNameEquals = spark.sessionState.analyzer.resolver + val resolver = spark.sessionState.analyzer.resolver splitConjunctivePredicates(expr).forall( e => - e.references.forall(r => partitionCols.exists(resolvedNameEquals(r.name, _))) && + isPredicatePartitionColumnsOnly(e, partitionCols, resolver) && !SubqueryExpression.hasSubquery(expr)) } @@ -148,12 +151,16 @@ object ExpressionHelper { val predicates = filters.map(converter.convert) PredicateBuilder.and(predicates: _*) } +} - def splitConjunctivePredicates(condition: Expression): Seq[Expression] = { - condition match { - case And(cond1, cond2) => - splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2) - case other => other :: Nil - } +object ExpressionHelper { + + case class FakeLogicalPlan(exprs: Seq[Expression], children: Seq[LogicalPlan]) + extends LogicalPlan { + override def output: Seq[Attribute] = Nil + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children = newChildren) } + } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala new file mode 100644 index 000000000000..b0c97c62046a --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis.expressions + +/** This wrapper is only used in java code, e.g. Procedure. */ +object ExpressionUtils extends ExpressionHelper diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 9f79664bead4..e4bf22d0fba2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -21,7 +21,6 @@ package org.apache.paimon.spark.commands import org.apache.paimon.options.Options import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor import org.apache.paimon.spark.{InsertInto, SparkTable} -import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper.convertConditionToPaimonPredicate import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL import org.apache.paimon.table.FileStoreTable diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index 02a0e0cc2ea8..ba404704b718 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -19,13 +19,13 @@ package org.apache.paimon.spark.commands import org.apache.paimon.spark.SparkFilterConverter +import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.types.RowType -import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter} /** Helper trait for all paimon commands. */ -trait PaimonCommand extends WithFileStoreTable with PredicateHelper { +trait PaimonCommand extends WithFileStoreTable with ExpressionHelper { /** * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call the `truncate` diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index edccb4989a8c..da269d4864eb 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -35,7 +35,7 @@ import java.io.IOException import scala.collection.JavaConverters._ -trait PaimonSparkWriter extends WithFileStoreTable { +case class PaimonSparkWriter(table: FileStoreTable) { private lazy val tableSchema = table.schema @@ -52,7 +52,9 @@ trait PaimonSparkWriter extends WithFileStoreTable { private lazy val serializer = new CommitMessageSerializer - def write(data: Dataset[_], writeBuilder: BatchWriteBuilder): Seq[CommitMessage] = { + val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder() + + def write(data: Dataset[_]): Seq[CommitMessage] = { val sparkSession = data.sparkSession import sparkSession.implicits._ @@ -101,6 +103,17 @@ trait PaimonSparkWriter extends WithFileStoreTable { commitMessages.toSeq } + def commit(commitMessages: Seq[CommitMessage]): Unit = { + val tableCommit = writeBuilder.newCommit() + try { + tableCommit.commit(commitMessages.toList.asJava) + } catch { + case e: Throwable => throw new RuntimeException(e); + } finally { + tableCommit.close() + } + } + /** assign a valid bucket id for each of record. */ private def assignBucketId( sparkSession: SparkSession, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala new file mode 100644 index 000000000000..cb3266157726 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.data.BinaryRow +import org.apache.paimon.io.DataFileMeta +import org.apache.paimon.table.source.DataSplit +import org.apache.paimon.utils.FileStorePathFactory + +import scala.collection.JavaConverters._ + +case class SparkDataFileMeta( + partition: BinaryRow, + bucket: Int, + totalBuckets: Int, + dataFileMeta: DataFileMeta) { + + def relativePath(fileStorePathFactory: FileStorePathFactory): String = { + fileStorePathFactory + .relativePartitionAndBucketPath(partition, bucket) + .toUri + .toString + "/" + dataFileMeta.fileName() + } +} + +object SparkDataFileMeta { + def convertToSparkDataFileMeta( + dataSplit: DataSplit, + totalBuckets: Int): Seq[SparkDataFileMeta] = { + dataSplit.dataFiles().asScala.map { + file => SparkDataFileMeta(dataSplit.partition, dataSplit.bucket, totalBuckets, file) + } + } + + def convertToDataSplits(sparkDataFiles: Array[SparkDataFileMeta]): Array[DataSplit] = { + sparkDataFiles + .groupBy(file => (file.partition, file.bucket)) + .map { + case ((partition, bucket), files) => + new DataSplit.Builder() + .withPartition(partition) + .withBucket(bucket) + .withDataFiles(files.map(_.dataFileMeta).toList.asJava) + .build() + } + .toArray + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index 45ef870f2d29..3bc236a868f9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -18,45 +18,169 @@ package org.apache.paimon.spark.commands -import org.apache.paimon.options.Options -import org.apache.paimon.spark.{InsertInto, SparkTable} -import org.apache.paimon.spark.catalyst.analysis.{AssignmentAlignmentHelper, PaimonRelation} +import org.apache.paimon.index.IndexFileMeta +import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, IndexIncrement} +import org.apache.paimon.spark.PaimonSplitScan +import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper +import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl} +import org.apache.paimon.table.source.DataSplit import org.apache.paimon.types.RowKind -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{Column, Row, SparkSession} import org.apache.spark.sql.Utils.createDataset -import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, UpdateTable} -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, Project, SupportsSubquery} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.functions.{input_file_name, lit} -case class UpdatePaimonTableCommand(u: UpdateTable) +import java.net.URI +import java.util.Collections + +import scala.collection.JavaConverters._ + +case class UpdatePaimonTableCommand( + relation: DataSourceV2Relation, + override val table: FileStoreTable, + condition: Expression, + assignments: Seq[Assignment]) extends PaimonLeafRunnableCommand - with AssignmentAlignmentHelper { + with PaimonCommand + with AssignmentAlignmentHelper + with SupportsSubquery { - override def run(sparkSession: SparkSession): Seq[Row] = { + private lazy val writer = PaimonSparkWriter(table) - val relation = PaimonRelation.getPaimonRelation(u.table) + private lazy val updateExpressions = { + generateAlignedExpressions(relation.output, assignments).zip(relation.output).map { + case (expr, attr) => Alias(expr, attr.name)() + } + } - val updatedExprs: Seq[Alias] = - generateAlignedExpressions(relation.output, u.assignments).zip(relation.output).map { - case (expr, attr) => Alias(expr, attr.name)() - } + override def run(sparkSession: SparkSession): Seq[Row] = { - val updatedPlan = Project(updatedExprs, Filter(u.condition.getOrElse(TrueLiteral), relation)) + val commitMessages = if (withPrimaryKeys) { + performUpdateForPkTable(sparkSession) + } else { + performUpdateForNonPkTable(sparkSession) + } + writer.commit(commitMessages) + Seq.empty[Row] + } + + /** Update for table with primary keys */ + private def performUpdateForPkTable(sparkSession: SparkSession): Seq[CommitMessage] = { + val updatedPlan = Project(updateExpressions, Filter(condition, relation)) val df = createDataset(sparkSession, updatedPlan) .withColumn(ROW_KIND_COL, lit(RowKind.UPDATE_AFTER.toByteValue)) + writer.write(df) + } - WriteIntoPaimonTable( - relation.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable], - InsertInto, - df, - Options.fromMap(relation.options)).run(sparkSession) + /** Update for table without primary keys */ + private def performUpdateForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = { + // Step1: the candidate data splits which are filtered by Paimon Predicate. + val candidateDataSplits = findCandidateDataSplits() - Seq.empty[Row] + val commitMessages = if (candidateDataSplits.isEmpty) { + // no data spilt need to be rewrote + logDebug("No file need to rerote. It's an empty Commit.") + Seq.empty[CommitMessage] + } else { + import sparkSession.implicits._ + + // Step2: extract out the exactly files, which must contain record to be updated. + val scan = PaimonSplitScan(table, candidateDataSplits.toArray) + val filteredRelation = + Filter(condition, DataSourceV2ScanRelation(relation, scan, relation.output)) + val touchedFilePaths = createDataset(sparkSession, filteredRelation) + .select(input_file_name()) + .distinct() + .as[String] + .collect() + .map(relativePath) + + // Step3: build a new list of data splits which compose of those files. + // Those are expected to be the smallest range of data files that need to be rewritten. + val totalBuckets = table.coreOptions().bucket() + val candidateDataFiles = candidateDataSplits + .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, totalBuckets)) + val fileStorePathFactory = table.store().pathFactory() + val fileNameToMeta = + candidateDataFiles + .map(file => (file.relativePath(fileStorePathFactory), file)) + .toMap + val touchedFiles: Array[SparkDataFileMeta] = touchedFilePaths.map { + file => fileNameToMeta.getOrElse(file, throw new RuntimeException(s"Missing file: $file")) + } + val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles) + + // Step4: build a dataframe that contains the unchanged and updated data, and write out them. + val columns = updateExpressions.zip(relation.output).map { + case (update, origin) => + val updated = if (condition == TrueLiteral) { + update + } else { + If(condition, update, origin) + } + new Column(updated).as(origin.name, origin.metadata) + } + val toUpdateScanRelation = DataSourceV2ScanRelation( + relation, + PaimonSplitScan(table, touchedDataSplits), + relation.output) + val data = createDataset(sparkSession, toUpdateScanRelation).select(columns: _*) + val addCommitMessage = writer.write(data) + + // Step5: convert the files that need to be wrote to commit message. + val deletedCommitMessage = touchedFiles + .groupBy(f => (f.partition, f.bucket)) + .map { + case ((partition, bucket), files) => + val bb = files.map(_.dataFileMeta).toList.asJava + val newFilesIncrement = new DataIncrement( + Collections.emptyList[DataFileMeta], + bb, + Collections.emptyList[DataFileMeta]) + buildCommitMessage( + new CommitMessageImpl(partition, bucket, newFilesIncrement, null, null)) + } + .toSeq + + addCommitMessage ++ deletedCommitMessage + } + commitMessages + } + + private def findCandidateDataSplits(): Seq[DataSplit] = { + val snapshotReader = table.newSnapshotReader() + if (condition == TrueLiteral) { + val filter = convertConditionToPaimonPredicate(condition, relation.output, rowType) + snapshotReader.withFilter(filter) + } + + snapshotReader.read().splits().asScala.collect { case s: DataSplit => s } + } + + /** Gets a relative path against the table path. */ + private def relativePath(absolutePath: String): String = { + val location = table.location().toUri + location.relativize(new URI(absolutePath)).toString + } + + private def buildCommitMessage(o: CommitMessageImpl): CommitMessage = { + new CommitMessageImpl( + o.partition, + o.bucket, + o.newFilesIncrement, + new CompactIncrement( + Collections.emptyList[DataFileMeta], + Collections.emptyList[DataFileMeta], + Collections.emptyList[DataFileMeta]), + new IndexIncrement(Collections.emptyList[IndexFileMeta])); } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala index 55be494986dd..1d447281e8be 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala @@ -19,9 +19,13 @@ package org.apache.paimon.spark.commands import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.types.RowType private[spark] trait WithFileStoreTable { def table: FileStoreTable + def withPrimaryKeys: Boolean = !table.primaryKeys().isEmpty + + def rowType: RowType = table.rowType() } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index b3077eb8a6a0..905c9cdfb7ff 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -39,7 +39,6 @@ case class WriteIntoPaimonTable( options: Options) extends RunnableCommand with PaimonCommand - with PaimonSparkWriter with SchemaHelper with Logging { @@ -57,20 +56,12 @@ case class WriteIntoPaimonTable( updateTableWithOptions( Map(DYNAMIC_PARTITION_OVERWRITE.key -> dynamicPartitionOverwriteMode.toString)) - val writeBuilder = table.newBatchWriteBuilder() + val writer = PaimonSparkWriter(table) if (overwritePartition != null) { - writeBuilder.withOverwrite(overwritePartition.asJava) - } - - val commitMessages = write(data, writeBuilder) - val tableCommit = writeBuilder.newCommit() - try { - tableCommit.commit(commitMessages.toList.asJava) - } catch { - case e: Throwable => throw new RuntimeException(e); - } finally { - tableCommit.close() + writer.writeBuilder.withOverwrite(overwritePartition.asJava) } + val commitMessages = writer.write(data) + writer.commit(commitMessages) Seq.empty } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index a853bcb3bf7a..bf8e72f7920f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -618,7 +618,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase { |THEN INSERT (a, b, c) values (a, b, c) |""".stripMargin) }.getMessage - assert(error.contains("Only support to merge into table with primary keys.")) + assert(error.contains("Only support to MergeInto table with primary keys.")) } } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala index 65261f3de8d0..cc95e7a90a7f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala @@ -26,15 +26,86 @@ import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} class UpdateTableTest extends PaimonSparkTestBase { - test(s"test update append only table") { + import testImplicits._ + + test(s"Paimon Update: append-only table") { spark.sql(s""" |CREATE TABLE T (id INT, name STRING, dt STRING) |""".stripMargin) - spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')") + spark.sql(""" + |INSERT INTO T + |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025') + |""".stripMargin) + + spark.sql("UPDATE T SET name = 'a_new' WHERE id = 1") + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY id"), + Seq((1, "a_new", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", "2025")).toDF() + ) + + val snapshotManager = loadTable("T").snapshotManager() + var lastSnapshotId = snapshotManager.latestSnapshotId() + spark.sql("UPDATE T SET name = concat(name, '2') WHERE id % 2 == 0") + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY id"), + Seq((1, "a_new", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d2", "2025")).toDF() + ) + assertThat(lastSnapshotId + 1).isEqualTo(snapshotManager.latestSnapshotId()) + + lastSnapshotId = snapshotManager.latestSnapshotId() + spark.sql("UPDATE T SET name = 'empty_commit' WHERE id > 100") + // no data need to be updated, it's an empty commit. + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY id"), + Seq((1, "a_new", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d2", "2025")).toDF() + ) + assertThat(lastSnapshotId).isEqualTo(snapshotManager.latestSnapshotId()) + } + + test(s"Paimon Update: append-only table with partition") { + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED BY (dt) + |""".stripMargin) + + spark.sql(""" + |INSERT INTO T + |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025') + |""".stripMargin) + + spark.sql("UPDATE T SET name = concat(name, '2') WHERE dt <= '2024'") + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY id"), + Seq((1, "a2", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d", "2025")).toDF() + ) + + spark.sql("UPDATE T SET name = concat(name, '3') WHERE dt = '2025' and id % 2 == 1") + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY id"), + Seq((1, "a2", "2024"), (2, "b2", "2024"), (3, "c3", "2025"), (4, "d", "2025")).toDF() + ) + + spark.sql("UPDATE T SET name = concat(name, '4') WHERE id % 2 == 0") + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY id"), + Seq((1, "a2", "2024"), (2, "b24", "2024"), (3, "c3", "2025"), (4, "d4", "2025")).toDF() + ) + } + + test("Paimon Update: append-only table, condition contains subquery") { + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED BY (dt) + |""".stripMargin) + + spark.sql(""" + |INSERT INTO T + |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), (4, 'd', '2025') + |""".stripMargin) - assertThatThrownBy(() => spark.sql("UPDATE T SET name = 'a_new' WHERE id = 1")) - .hasMessageContaining("Only support to update table with primary keys.") + Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids") + assertThatThrownBy( + () => spark.sql("UPDATE T set name = 'in_new' WHERE id IN (SELECT * FROM updated_ids)")) + .hasMessageContaining("Subqueries are not supported") } CoreOptions.MergeEngine.values().foreach {