Skip to content

Commit

Permalink
[spark] Delete supports all merge engines (apache#3294)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored May 6, 2024
1 parent fcb7c1d commit db3a693
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.paimon.table.sink.CompactionTaskSerializer;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.SerializationUtils;
Expand Down Expand Up @@ -221,13 +221,13 @@ private boolean execute(

private void compactAwareBucketTable(
FileStoreTable table, @Nullable Predicate filter, JavaSparkContext javaSparkContext) {
InnerTableScan scan = table.newScan();
SnapshotReader snapshotReader = table.newSnapshotReader();
if (filter != null) {
scan.withFilter(filter);
snapshotReader.withFilter(filter);
}

List<Pair<byte[], Integer>> partitionBuckets =
scan.plan().splits().stream()
snapshotReader.read().splits().stream()
.map(split -> (DataSplit) split)
.map(dataSplit -> Pair.of(dataSplit.partition(), dataSplit.bucket()))
.distinct()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ sealed trait RowLevelOp {

case object Delete extends RowLevelOp {

override val supportedMergeEngine: Seq[MergeEngine] = Seq(MergeEngine.DEDUPLICATE)
override val supportedMergeEngine: Seq[MergeEngine] = Seq(
MergeEngine.DEDUPLICATE,
MergeEngine.PARTIAL_UPDATE,
MergeEngine.AGGREGATE,
MergeEngine.FIRST_ROW)

override val supportAppendOnlyTable: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions
import org.apache.paimon.options.Options
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
Expand Down Expand Up @@ -99,10 +97,10 @@ case class DeleteFromPaimonTableCommand(
writer.commit(Seq.empty)
}
} else {
val commitMessages = if (withPrimaryKeys) {
performDeleteForPkTable(sparkSession)
val commitMessages = if (usePrimaryKeyDelete()) {
performPrimaryKeyDelete(sparkSession)
} else {
performDeleteForNonPkTable(sparkSession)
performDeleteCopyOnWrite(sparkSession)
}
writer.commit(commitMessages)
}
Expand All @@ -111,13 +109,17 @@ case class DeleteFromPaimonTableCommand(
Seq.empty[Row]
}

def performDeleteForPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
def usePrimaryKeyDelete(): Boolean = {
withPrimaryKeys && table.coreOptions().mergeEngine() == MergeEngine.DEDUPLICATE
}

def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
val df = createDataset(sparkSession, Filter(condition, relation))
.withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
writer.write(df)
}

def performDeleteForNonPkTable(sparkSession: SparkSession): Seq[CommitMessage] = {
def performDeleteCopyOnWrite(sparkSession: SparkSession): Seq[CommitMessage] = {
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
val fileNameToMeta = candidateFileMap(candidateDataSplits)
Expand All @@ -142,7 +144,9 @@ case class DeleteFromPaimonTableCommand(
PaimonSplitScan(table, touchedDataSplits),
relation.output))
val data = createDataset(sparkSession, toRewriteScanRelation)
val addCommitMessage = writer.write(data)

// only write new files, should have no compaction
val addCommitMessage = writer.writeOnly().write(data)

// Step5: convert the deleted files that need to be wrote to commit message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFile
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.Preconditions

import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -95,6 +96,7 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
protected def findCandidateDataSplits(
condition: Expression,
output: Seq[Attribute]): Seq[DataSplit] = {
// low level snapshot reader, it can not be affected by 'scan.mode'
val snapshotReader = table.newSnapshotReader()
if (condition == TrueLiteral) {
val filter =
Expand All @@ -112,6 +114,14 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
sparkSession: SparkSession): Array[String] = {
import sparkSession.implicits._

// only raw convertible can generate input_file_name()
for (split <- candidateDataSplits) {
if (!split.rawConvertible()) {
throw new IllegalArgumentException(
"Only compacted table can generate touched files, please use 'COMPACT' procedure first.");
}
}

val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
val filteredRelation =
FilterLogicalNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.WRITE_ONLY
import org.apache.paimon.index.BucketAssigner
import org.apache.paimon.spark.SparkRow
import org.apache.paimon.spark.SparkUtils.createIOManager
Expand All @@ -32,6 +34,8 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._

import java.io.IOException
import java.util.Collections
import java.util.Collections.singletonMap

import scala.collection.JavaConverters._

Expand All @@ -54,6 +58,10 @@ case class PaimonSparkWriter(table: FileStoreTable) {

val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()

def writeOnly(): PaimonSparkWriter = {
PaimonSparkWriter(table.copy(singletonMap(WRITE_ONLY.key(), "true")))
}

def write(data: Dataset[_]): Seq[CommitMessage] = {
val sparkSession = data.sparkSession
import sparkSession.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.paimon.spark.sql

import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.analysis.Delete

import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}

abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
Expand Down Expand Up @@ -176,23 +178,30 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
mergeEngine =>
{
test(s"test delete with merge engine $mergeEngine") {
val options = if ("first-row".equals(mergeEngine.toString)) {
s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine', 'changelog-producer' = 'lookup'"
} else {
s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine'"
}
val otherOptions =
if ("first-row".equals(mergeEngine.toString)) "'changelog-producer' = 'lookup'," else ""
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, dt STRING)
|TBLPROPERTIES ($options)
|CREATE TABLE T (id INT, name STRING, age INT)
|TBLPROPERTIES (
| $otherOptions
| 'primary-key' = 'id',
| 'merge-engine' = '$mergeEngine',
| 'write-only' = 'true')
|""".stripMargin)

spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
spark.sql("INSERT INTO T VALUES (1, 'a', NULL)")
spark.sql("INSERT INTO T VALUES (2, 'b', NULL)")
spark.sql("INSERT INTO T VALUES (1, NULL, 16)")

if (mergeEngine != MergeEngine.DEDUPLICATE) {
assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE id = 1"))
.hasMessageContaining("please use 'COMPACT' procedure first")
spark.sql("CALL sys.compact(table => 'T')")
}

if (Delete.supportedMergeEngine.contains(mergeEngine)) {
spark.sql("DELETE FROM T WHERE name = 'a'")
} else
assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name = 'a'"))
.isInstanceOf(classOf[UnsupportedOperationException])
spark.sql("DELETE FROM T WHERE id = 1")
assertThat(spark.sql("SELECT * FROM T").collectAsList().toString)
.isEqualTo("[[2,b,null]]")
}
}
}
Expand Down Expand Up @@ -345,7 +354,6 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
spark.sql("DELETE FROM T WHERE hh = '12'")
assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE rowkind='-D'").collectAsList().size())
.isEqualTo(3)

}
}

Expand Down

0 comments on commit db3a693

Please sign in to comment.