Skip to content

Commit

Permalink
[spark] Fix delete with partial non-convertible partition filter (#4738)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Dec 20, 2024
1 parent 3a9bed2 commit 85c462e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
import org.apache.spark.sql.sources.StringEndsWith;
import org.apache.spark.sql.sources.StringStartsWith;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.predicate.PredicateBuilder.convertJavaObject;
Expand Down Expand Up @@ -75,11 +78,21 @@ public SparkFilterConverter(RowType rowType) {
this.builder = new PredicateBuilder(rowType);
}

@Nullable
public Predicate convertIgnoreFailure(Filter filter) {
return convert(filter, true);
}

@Nullable
public Predicate convert(Filter filter, boolean ignoreFailure) {
try {
return convert(filter);
} catch (Exception e) {
return null;
if (ignoreFailure) {
return null;
} else {
throw e;
}
}
}

Expand Down Expand Up @@ -139,7 +152,10 @@ public Predicate convert(Filter filter) {
return PredicateBuilder.or(convert(or.left()), convert(or.right()));
} else if (filter instanceof Not) {
Not not = (Not) filter;
return convert(not.child()).negate().orElseThrow(UnsupportedOperationException::new);
Optional<Predicate> negate = convert(not.child()).negate();
if (negate.isPresent()) {
return negate.get();
}
} else if (filter instanceof StringStartsWith) {
StringStartsWith startsWith = (StringStartsWith) filter;
int index = fieldIndex(startsWith.attribute());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,24 @@ trait ExpressionHelper extends PredicateHelper {
condition: Expression,
output: Seq[Attribute],
rowType: RowType,
ignoreFailure: Boolean = false): Option[Predicate] = {
ignorePartialFailure: Boolean = false): Option[Predicate] = {
val converter = new SparkFilterConverter(rowType)
val filters = normalizeExprs(Seq(condition), output)
.flatMap(splitConjunctivePredicates(_).flatMap {
f =>
val filter = translateFilter(f, supportNestedPredicatePushdown = true)
if (filter.isEmpty && !ignoreFailure) {
if (filter.isEmpty && !ignorePartialFailure) {
throw new RuntimeException(
"Exec update failed:" +
s" cannot translate expression to source filter: $f")
}
filter
})
.toArray

if (filters.isEmpty) {
val predicates = filters.map(converter.convert(_, ignorePartialFailure)).filter(_ != null)
if (predicates.isEmpty) {
None
} else {
val predicates = filters.map(converter.convertIgnoreFailure)
Some(PredicateBuilder.and(predicates: _*))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
Expand Down Expand Up @@ -65,11 +66,15 @@ case class DeleteFromPaimonTableCommand(
val partitionPredicate = if (partitionCondition.isEmpty) {
None
} else {
convertConditionToPaimonPredicate(
partitionCondition.reduce(And),
relation.output,
table.schema.logicalPartitionType(),
ignoreFailure = true)
try {
convertConditionToPaimonPredicate(
partitionCondition.reduce(And),
relation.output,
table.schema.logicalPartitionType())
} catch {
case _: Throwable =>
None
}
}

if (
Expand Down Expand Up @@ -106,17 +111,17 @@ case class DeleteFromPaimonTableCommand(
Seq.empty[Row]
}

def usePrimaryKeyDelete(): Boolean = {
private def usePrimaryKeyDelete(): Boolean = {
withPrimaryKeys && table.coreOptions().mergeEngine() == MergeEngine.DEDUPLICATE
}

def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
private def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
val df = createDataset(sparkSession, Filter(condition, relation))
.withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
writer.write(df)
}

def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
private def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
Expand Down Expand Up @@ -148,11 +153,10 @@ case class DeleteFromPaimonTableCommand(
// only write new files, should have no compaction
val addCommitMessage = writer.writeOnly().write(data)

// Step5: convert the deleted files that need to be wrote to commit message.
// Step5: convert the deleted files that need to be written to commit message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)

addCommitMessage ++ deletedCommitMessage
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon
}
if (condition != TrueLiteral) {
val filter =
convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true)
convertConditionToPaimonPredicate(condition, output, rowType, ignorePartialFailure = true)
filter.foreach(snapshotReader.withFilter)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

import static org.apache.paimon.data.BinaryString.fromString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link SparkFilterConverter}. */
public class SparkFilterConverterTest {
Expand Down Expand Up @@ -225,7 +225,9 @@ public void testIgnoreFailure() {
SparkFilterConverter converter = new SparkFilterConverter(rowType);

Not not = Not.apply(StringStartsWith.apply("name", "paimon"));
catchThrowableOfType(() -> converter.convert(not), UnsupportedOperationException.class);
assertThatThrownBy(() -> converter.convert(not, false))
.hasMessageContaining("Not(StringStartsWith(name,paimon)) is unsupported.");
assertThat(converter.convert(not, true)).isNull();
assertThat(converter.convertIgnoreFailure(not)).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,14 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
Row(1, "20240601") :: Nil
)
}

test(s"Paimon Delete: delete with non-convertible partition filter") {
sql("CREATE TABLE T (id INT, p STRING) PARTITIONED BY (p)")
sql("INSERT INTO T VALUES (2, '2024-12-16'), (3, '2024-12-17'), (4, '2024-12-18')")
sql("DELETE FROM T WHERE p >= date_sub('2024-12-17', 0) AND p < '2024-12-18'")
checkAnswer(
sql("SELECT * FROM T ORDER BY id"),
Seq(Row(2, "2024-12-16"), Row(4, "2024-12-18"))
)
}
}

0 comments on commit 85c462e

Please sign in to comment.