Skip to content

Commit

Permalink
[spark] Minimize the number of data splits need to be scaned when upd…
Browse files Browse the repository at this point in the history
…ate (#3147)
  • Loading branch information
YannByron authored Apr 2, 2024
1 parent b8884f8 commit e4ca384
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public SparkFilterConverter(RowType rowType) {
this.builder = new PredicateBuilder(rowType);
}

public Predicate convertIgnoreFailure(Filter filter) {
try {
return convert(filter);
} catch (Exception e) {
return null;
}
}

public Predicate convert(Filter filter) {
if (filter instanceof EqualTo) {
EqualTo eq = (EqualTo) filter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ private boolean execute(
condition == null
? null
: ExpressionUtils.convertConditionToPaimonPredicate(
condition, relation.output(), table.rowType());
condition, relation.output(), table.rowType(), false)
.getOrElse(null);
switch (bucketMode) {
case FIXED:
case DYNAMIC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,16 @@ abstract class PaimonBaseScanBuilder(table: Table)
val visitor = new PartitionPredicateVisitor(table.partitionKeys())
filters.foreach {
filter =>
try {
val predicate = converter.convert(filter)
val predicate = converter.convertIgnoreFailure(filter)
if (predicate == null) {
postScan.append(filter)
} else {
pushable.append((filter, predicate))
if (!predicate.visit(visitor)) {
postScan.append(filter)
} else {
if (predicate.visit(visitor)) {
reserved.append(filter)
}
} catch {
case e: UnsupportedOperationException =>
logWarning(e.getMessage)
} else {
postScan.append(filter)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,28 @@ trait ExpressionHelper extends PredicateHelper {
def convertConditionToPaimonPredicate(
condition: Expression,
output: Seq[Attribute],
rowType: RowType): Predicate = {
rowType: RowType,
ignoreFailure: Boolean = false): Option[Predicate] = {
val converter = new SparkFilterConverter(rowType)
val filters = normalizeExprs(Seq(condition), output)
.flatMap(splitConjunctivePredicates(_).map {
.flatMap(splitConjunctivePredicates(_).flatMap {
f =>
translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
throw new RuntimeException("Exec update failed:" +
s" cannot translate expression to source filter: $f"))
val filter = translateFilter(f, supportNestedPredicatePushdown = true)
if (filter.isEmpty && !ignoreFailure) {
throw new RuntimeException(
"Exec update failed:" +
s" cannot translate expression to source filter: $f")
}
filter
})
.toArray
val predicates = filters.map(converter.convert)
PredicateBuilder.and(predicates: _*)

if (filters.isEmpty) {
None
} else {
val predicates = filters.map(converter.convertIgnoreFailure)
Some(PredicateBuilder.and(predicates: _*))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with Pa
(None, false)
} else {
try {
(
Some(convertConditionToPaimonPredicate(condition(), relation.output, table.rowType())),
false)
(convertConditionToPaimonPredicate(condition(), relation.output, table.rowType()), false)
} catch {
case NonFatal(_) =>
(None, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ case class UpdatePaimonTableCommand(

private def findCandidateDataSplits(): Seq[DataSplit] = {
val snapshotReader = table.newSnapshotReader()
if (condition == TrueLiteral) {
val filter = convertConditionToPaimonPredicate(condition, relation.output, rowType)
snapshotReader.withFilter(filter)
if (condition != TrueLiteral) {
val filter =
convertConditionToPaimonPredicate(condition, relation.output, rowType, ignoreFailure = true)
filter.foreach(snapshotReader.withFilter)
}

snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;

import org.apache.spark.sql.sources.EqualNullSafe;
import org.apache.spark.sql.sources.EqualTo;
Expand All @@ -36,17 +37,22 @@
import org.apache.spark.sql.sources.IsNull;
import org.apache.spark.sql.sources.LessThan;
import org.apache.spark.sql.sources.LessThanOrEqual;
import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.StringStartsWith;
import org.junit.jupiter.api.Test;

import java.sql.Date;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;

/** Test for {@link SparkFilterConverter}. */
public class SparkFilterConverterTest {
Expand Down Expand Up @@ -183,4 +189,17 @@ public void testDate() {
assertThat(dateExpression).isEqualTo(rawExpression);
assertThat(localDateExpression).isEqualTo(rawExpression);
}

@Test
public void testIgnoreFailure() {
List<DataField> dataFields = new ArrayList<>();
dataFields.add(new DataField(0, "id", new IntType()));
dataFields.add(new DataField(1, "name", new VarCharType(VarCharType.MAX_LENGTH)));
RowType rowType = new RowType(dataFields);
SparkFilterConverter converter = new SparkFilterConverter(rowType);

Not not = Not.apply(StringStartsWith.apply("name", "paimon"));
catchThrowableOfType(() -> converter.convert(not), UnsupportedOperationException.class);
assertThat(converter.convertIgnoreFailure(not)).isNull();
}
}

0 comments on commit e4ca384

Please sign in to comment.