Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Fix delete with partial non-convertible partition filter #4738

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
import org.apache.spark.sql.sources.StringEndsWith;
import org.apache.spark.sql.sources.StringStartsWith;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.predicate.PredicateBuilder.convertJavaObject;
Expand Down Expand Up @@ -75,11 +78,21 @@ public SparkFilterConverter(RowType rowType) {
this.builder = new PredicateBuilder(rowType);
}

@Nullable
public Predicate convertIgnoreFailure(Filter filter) {
return convert(filter, true);
}

@Nullable
public Predicate convert(Filter filter, boolean ignoreFailure) {
try {
return convert(filter);
} catch (Exception e) {
return null;
if (ignoreFailure) {
return null;
} else {
throw e;
}
}
}

Expand Down Expand Up @@ -139,7 +152,10 @@ public Predicate convert(Filter filter) {
return PredicateBuilder.or(convert(or.left()), convert(or.right()));
} else if (filter instanceof Not) {
Not not = (Not) filter;
return convert(not.child()).negate().orElseThrow(UnsupportedOperationException::new);
Optional<Predicate> negate = convert(not.child()).negate();
if (negate.isPresent()) {
return negate.get();
}
} else if (filter instanceof StringStartsWith) {
StringStartsWith startsWith = (StringStartsWith) filter;
int index = fieldIndex(startsWith.attribute());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,24 @@ trait ExpressionHelper extends PredicateHelper {
condition: Expression,
output: Seq[Attribute],
rowType: RowType,
ignoreFailure: Boolean = false): Option[Predicate] = {
ignorePartialFailure: Boolean = false): Option[Predicate] = {
val converter = new SparkFilterConverter(rowType)
val filters = normalizeExprs(Seq(condition), output)
.flatMap(splitConjunctivePredicates(_).flatMap {
f =>
val filter = translateFilter(f, supportNestedPredicatePushdown = true)
if (filter.isEmpty && !ignoreFailure) {
if (filter.isEmpty && !ignorePartialFailure) {
throw new RuntimeException(
"Exec update failed:" +
s" cannot translate expression to source filter: $f")
}
filter
})
.toArray

if (filters.isEmpty) {
val predicates = filters.map(converter.convert(_, ignorePartialFailure)).filter(_ != null)
if (predicates.isEmpty) {
None
} else {
val predicates = filters.map(converter.convertIgnoreFailure)
Some(PredicateBuilder.and(predicates: _*))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
Expand Down Expand Up @@ -65,11 +66,15 @@ case class DeleteFromPaimonTableCommand(
val partitionPredicate = if (partitionCondition.isEmpty) {
None
} else {
convertConditionToPaimonPredicate(
partitionCondition.reduce(And),
relation.output,
table.schema.logicalPartitionType(),
ignoreFailure = true)
try {
convertConditionToPaimonPredicate(
partitionCondition.reduce(And),
relation.output,
table.schema.logicalPartitionType())
} catch {
case _: Throwable =>
None
}
}

if (
Expand Down Expand Up @@ -106,17 +111,17 @@ case class DeleteFromPaimonTableCommand(
Seq.empty[Row]
}

def usePrimaryKeyDelete(): Boolean = {
private def usePrimaryKeyDelete(): Boolean = {
withPrimaryKeys && table.coreOptions().mergeEngine() == MergeEngine.DEDUPLICATE
}

def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
private def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
val df = createDataset(sparkSession, Filter(condition, relation))
.withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
writer.write(df)
}

def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
private def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
Expand Down Expand Up @@ -148,11 +153,10 @@ case class DeleteFromPaimonTableCommand(
// only write new files, should have no compaction
val addCommitMessage = writer.writeOnly().write(data)

// Step5: convert the deleted files that need to be wrote to commit message.
// Step5: convert the deleted files that need to be written to commit message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)

addCommitMessage ++ deletedCommitMessage
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon
}
if (condition != TrueLiteral) {
val filter =
convertConditionToPaimonPredicate(condition, output, rowType, ignoreFailure = true)
convertConditionToPaimonPredicate(condition, output, rowType, ignorePartialFailure = true)
filter.foreach(snapshotReader.withFilter)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

import static org.apache.paimon.data.BinaryString.fromString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link SparkFilterConverter}. */
public class SparkFilterConverterTest {
Expand Down Expand Up @@ -225,7 +225,9 @@ public void testIgnoreFailure() {
SparkFilterConverter converter = new SparkFilterConverter(rowType);

Not not = Not.apply(StringStartsWith.apply("name", "paimon"));
catchThrowableOfType(() -> converter.convert(not), UnsupportedOperationException.class);
assertThatThrownBy(() -> converter.convert(not, false))
.hasMessageContaining("Not(StringStartsWith(name,paimon)) is unsupported.");
assertThat(converter.convert(not, true)).isNull();
assertThat(converter.convertIgnoreFailure(not)).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,14 @@ abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
Row(1, "20240601") :: Nil
)
}

test(s"Paimon Delete: delete with non-convertible partition filter") {
sql("CREATE TABLE T (id INT, p STRING) PARTITIONED BY (p)")
sql("INSERT INTO T VALUES (2, '2024-12-16'), (3, '2024-12-17'), (4, '2024-12-18')")
sql("DELETE FROM T WHERE p >= date_sub('2024-12-17', 0) AND p < '2024-12-18'")
checkAnswer(
sql("SELECT * FROM T ORDER BY id"),
Seq(Row(2, "2024-12-16"), Row(4, "2024-12-18"))
)
}
}
Loading