From 85c462e3d56576f9028438ec92e1f7772f18f192 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Fri, 20 Dec 2024 14:06:39 +0800 Subject: [PATCH] [spark] Fix delete with partial non-convertible partition filter (#4738) --- .../paimon/spark/SparkFilterConverter.java | 20 ++++++++++++++-- .../expressions/ExpressionHelper.scala | 9 ++++--- .../DeleteFromPaimonTableCommand.scala | 24 +++++++++++-------- .../paimon/spark/commands/PaimonCommand.scala | 2 +- .../spark/SparkFilterConverterTest.java | 6 +++-- .../spark/sql/DeleteFromTableTestBase.scala | 10 ++++++++ 6 files changed, 51 insertions(+), 20 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java index 2ea2e3c45347..6b9375e5563c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java @@ -40,8 +40,11 @@ import org.apache.spark.sql.sources.StringEndsWith; import org.apache.spark.sql.sources.StringStartsWith; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.paimon.predicate.PredicateBuilder.convertJavaObject; @@ -75,11 +78,21 @@ public SparkFilterConverter(RowType rowType) { this.builder = new PredicateBuilder(rowType); } + @Nullable public Predicate convertIgnoreFailure(Filter filter) { + return convert(filter, true); + } + + @Nullable + public Predicate convert(Filter filter, boolean ignoreFailure) { try { return convert(filter); } catch (Exception e) { - return null; + if (ignoreFailure) { + return null; + } else { + throw e; + } } } @@ -139,7 +152,10 @@ public Predicate convert(Filter filter) { return PredicateBuilder.or(convert(or.left()), convert(or.right())); } else if (filter instanceof Not) { Not not = (Not) filter; - return convert(not.child()).negate().orElseThrow(UnsupportedOperationException::new); + Optional negate = convert(not.child()).negate(); + if (negate.isPresent()) { + return negate.get(); + } } else if (filter instanceof StringStartsWith) { StringStartsWith startsWith = (StringStartsWith) filter; int index = fieldIndex(startsWith.attribute()); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala index d4010ea33811..2eef2c41aebe 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala @@ -167,25 +167,24 @@ trait ExpressionHelper extends PredicateHelper { condition: Expression, output: Seq[Attribute], rowType: RowType, - ignoreFailure: Boolean = false): Option[Predicate] = { + ignorePartialFailure: Boolean = false): Option[Predicate] = { val converter = new SparkFilterConverter(rowType) val filters = normalizeExprs(Seq(condition), output) .flatMap(splitConjunctivePredicates(_).flatMap { f => val filter = translateFilter(f, supportNestedPredicatePushdown = true) - if (filter.isEmpty && !ignoreFailure) { + if (filter.isEmpty && !ignorePartialFailure) { throw new RuntimeException( "Exec update failed:" + s" cannot translate expression to source filter: $f") } filter }) - .toArray - if (filters.isEmpty) { + val predicates = filters.map(converter.convert(_, ignorePartialFailure)).filter(_ != null) + if (predicates.isEmpty) { None } else { - val predicates = filters.map(converter.convertIgnoreFailure) Some(PredicateBuilder.and(predicates: _*)) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 097823d730ce..799551aa6bb9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.MergeEngine +import org.apache.paimon.predicate.Predicate import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL @@ -65,11 +66,15 @@ case class DeleteFromPaimonTableCommand( val partitionPredicate = if (partitionCondition.isEmpty) { None } else { - convertConditionToPaimonPredicate( - partitionCondition.reduce(And), - relation.output, - table.schema.logicalPartitionType(), - ignoreFailure = true) + try { + convertConditionToPaimonPredicate( + partitionCondition.reduce(And), + relation.output, + table.schema.logicalPartitionType()) + } catch { + case _: Throwable => + None + } } if ( @@ -106,17 +111,17 @@ case class DeleteFromPaimonTableCommand( Seq.empty[Row] } - def usePrimaryKeyDelete(): Boolean = { + private def usePrimaryKeyDelete(): Boolean = { withPrimaryKeys && table.coreOptions().mergeEngine() == MergeEngine.DEDUPLICATE } - def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = { + private def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = { val df = createDataset(sparkSession, Filter(condition, relation)) .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue)) writer.write(df) } - def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = { + private def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = { // Step1: the candidate data splits which are filtered by Paimon Predicate. val candidateDataSplits = findCandidateDataSplits(condition, relation.output) val dataFilePathToMeta = candidateFileMap(candidateDataSplits) @@ -148,11 +153,10 @@ case class DeleteFromPaimonTableCommand( // only write new files, should have no compaction val addCommitMessage = writer.writeOnly().write(data) - // Step5: convert the deleted files that need to be wrote to commit message. + // Step5: convert the deleted files that need to be written to commit message. val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles) addCommitMessage ++ deletedCommitMessage } } - } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index 04118a438307..28ac1623fb59 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -101,7 +101,7 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon } if (condition != TrueLiteral) { val filter = - convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true) + convertConditionToPaimonPredicate(condition, output, rowType, ignorePartialFailure = true) filter.foreach(snapshotReader.withFilter) } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java index 8ddb965697dc..ca976718f03b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java @@ -57,7 +57,7 @@ import static org.apache.paimon.data.BinaryString.fromString; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.catchThrowableOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link SparkFilterConverter}. */ public class SparkFilterConverterTest { @@ -225,7 +225,9 @@ public void testIgnoreFailure() { SparkFilterConverter converter = new SparkFilterConverter(rowType); Not not = Not.apply(StringStartsWith.apply("name", "paimon")); - catchThrowableOfType(() -> converter.convert(not), UnsupportedOperationException.class); + assertThatThrownBy(() -> converter.convert(not, false)) + .hasMessageContaining("Not(StringStartsWith(name,paimon)) is unsupported."); + assertThat(converter.convert(not, true)).isNull(); assertThat(converter.convertIgnoreFailure(not)).isNull(); } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala index aaaf5867e8b7..849e4cc98271 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala @@ -374,4 +374,14 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase { Row(1, "20240601") :: Nil ) } + + test(s"Paimon Delete: delete with non-convertible partition filter") { + sql("CREATE TABLE T (id INT, p STRING) PARTITIONED BY (p)") + sql("INSERT INTO T VALUES (2, '2024-12-16'), (3, '2024-12-17'), (4, '2024-12-18')") + sql("DELETE FROM T WHERE p >= date_sub('2024-12-17', 0) AND p < '2024-12-18'") + checkAnswer( + sql("SELECT * FROM T ORDER BY id"), + Seq(Row(2, "2024-12-16"), Row(4, "2024-12-18")) + ) + } }