Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] delete for append table #3140

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
* limitations under the License.
*/

package org.apache.paimon.spark.commands
package org.apache.paimon.spark.catalyst

import org.apache.paimon.spark.SparkTable
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
object Compatibility {

def createDataSourceV2ScanRelation(
relation: DataSourceV2Relation,
scan: Scan,
output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
DataSourceV2ScanRelation(relation, scan, output)
}

case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
extends DeleteFromPaimonTableCommandBase {
override def condition(): Expression = delete.condition.orNull
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule

import scala.collection.JavaConverters._

object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {

override val operation: RowLevelOp = Delete

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved =>
checkPaimonTable(table.getTable)

table.getTable match {
case paimonTable: FileStoreTable =>
val primaryKeys = paimonTable.primaryKeys().asScala
if (primaryKeys.isEmpty) {
condition.foreach(checkSubquery)
}

val relation = PaimonRelation.getPaimonRelation(d.table)
DeleteFromPaimonTableCommand(relation, paimonTable, condition.getOrElse(TrueLiteral))

case _ =>
throw new RuntimeException("Delete Operation is only supported for FileStoreTable.")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}

object Compatibility {

def createDataSourceV2ScanRelation(
relation: DataSourceV2Relation,
scan: Scan,
output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
DataSourceV2ScanRelation(relation, scan, output)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}

object Compatibility {

def createDataSourceV2ScanRelation(
relation: DataSourceV2Relation,
scan: Scan,
output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
DataSourceV2ScanRelation(relation, scan, output)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule

import scala.collection.JavaConverters._

object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {

override val operation: RowLevelOp = Delete
Expand All @@ -32,7 +35,19 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved =>
checkPaimonTable(table.getTable)

DeleteFromPaimonTableCommand(table, d)
table.getTable match {
case paimonTable: FileStoreTable =>
val primaryKeys = paimonTable.primaryKeys().asScala
if (primaryKeys.isEmpty) {
checkSubquery(condition)
}

val relation = PaimonRelation.getPaimonRelation(d.table)
DeleteFromPaimonTableCommand(relation, paimonTable, condition)

case _ =>
throw new RuntimeException("Delete Operation is only supported for FileStoreTable.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case object Delete extends RowLevelOp {

override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE)

override val supportAppendOnlyTable: Boolean = false
override val supportAppendOnlyTable: Boolean = true

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ trait ExpressionHelper extends PredicateHelper {
}
}

def splitPruePartitionAndOtherPredicates(
condition: Expression,
partitionColumns: Seq[String],
resolver: Resolver): (Seq[Expression], Seq[Expression]) = {
splitConjunctivePredicates(condition)
.partition {
isPredicatePartitionColumnsOnly(_, partitionColumns, resolver) && !SubqueryExpression
.hasSubquery(condition)
}
}

def isPredicatePartitionColumnsOnly(
condition: Expression,
partitionColumns: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,77 +18,123 @@

package org.apache.paimon.spark.commands

import org.apache.paimon.options.Options
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.BatchWriteBuilder
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.Utils.createDataset
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Not}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.functions.lit

import java.util.{Collections, UUID}

import scala.util.control.NonFatal
import scala.collection.JavaConverters._
import scala.util.Try

trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with PaimonCommand {
self: DeleteFromPaimonTableCommand =>
override def table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]
case class DeleteFromPaimonTableCommand(
relation: DataSourceV2Relation,
override val table: FileStoreTable,
condition: Expression)
extends PaimonLeafRunnableCommand
with PaimonCommand
with ExpressionHelper
with SupportsSubquery {

private val relation = delete.table
private lazy val writer = PaimonSparkWriter(table)

def condition(): Expression
override def run(sparkSession: SparkSession): Seq[Row] = {

private lazy val (deletePredicate, forceDeleteByRows) =
val commit = table.store.newCommit(UUID.randomUUID.toString)
if (condition == null || condition == TrueLiteral) {
(None, false)
commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
try {
(convertConditionToPaimonPredicate(condition(), relation.output, table.rowType()), false)
} catch {
case NonFatal(_) =>
(None, true)
val (partitionCondition, otherCondition) = splitPruePartitionAndOtherPredicates(
condition,
table.partitionKeys().asScala,
sparkSession.sessionState.conf.resolver)

// TODO: provide another partition visitor to support more partition predicate.
val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
val partitionPredicate = if (partitionCondition.isEmpty) {
None
} else {
convertConditionToPaimonPredicate(
partitionCondition.reduce(And),
relation.output,
rowType,
ignoreFailure = true)
}
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val commit = table.store.newCommit(UUID.randomUUID.toString)
// We do not have to scan table if the following three requirements are met:
// 1) no other predicate;
// 2) partition condition can convert to paimon predicate;
// 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor.
val forceDeleteByRows =
otherCondition.nonEmpty || partitionPredicate.isEmpty || !partitionPredicate.get.visit(
visitor)

if (forceDeleteByRows) {
deleteRowsByCondition(sparkSession)
} else if (deletePredicate.isEmpty) {
commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
if (deletePredicate.get.visit(visitor)) {
if (forceDeleteByRows) {
val commitMessages = if (withPrimaryKeys) {
performDeleteForPkTable(sparkSession)
} else {
performDeleteForNonPkTable(sparkSession)
}
writer.commit(commitMessages)
} else {
val dropPartitions = visitor.partitions()
commit.dropPartitions(
Collections.singletonList(dropPartitions),
BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
deleteRowsByCondition(sparkSession)
}
}

Seq.empty[Row]
}

private def deleteRowsByCondition(sparkSession: SparkSession): Unit = {
def performDeleteForPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
val df = createDataset(sparkSession, Filter(condition, relation))
.withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
writer.write(df)
}

def performDeleteForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
val fileNameToMeta = candidateFileMap(candidateDataSplits)

// Step2: extract out the exactly files, which must have at least one record to be updated.
val touchedFilePaths = findTouchedFiles(candidateDataSplits, condition, relation, sparkSession)

WriteIntoPaimonTable(table, InsertInto, df, new Options()).run(sparkSession)
// Step3: the smallest range of data files that need to be rewritten.
val touchedFiles = touchedFilePaths.map {
file => fileNameToMeta.getOrElse(file, throw new RuntimeException(s"Missing file: $file"))
}

// Step4: build a dataframe that contains the unchanged data, and write out them.
val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
val toRewriteScanRelation = Filter(
Not(condition),
Compatibility.createDataSourceV2ScanRelation(
relation,
PaimonSplitScan(table, touchedDataSplits),
relation.output))
val data = createDataset(sparkSession, toRewriteScanRelation)
val addCommitMessage = writer.write(data)

// Step5: convert the deleted files that need to be wrote to commit message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)

addCommitMessage ++ deletedCommitMessage
}
}

case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: DeleteFromTable)
extends DeleteFromPaimonTableCommandBase {
override def condition(): Expression = delete.condition
}
Loading
Loading