From e4ca384c69e8fbeb2112af842f9bbb1023fc8c59 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Tue, 2 Apr 2024 20:30:01 +0800 Subject: [PATCH] [spark] Minimize the number of data splits need to be scaned when update (#3147) --- .../paimon/spark/SparkFilterConverter.java | 8 +++++++ .../spark/procedure/CompactProcedure.java | 3 ++- .../paimon/spark/PaimonBaseScanBuilder.scala | 16 ++++++------- .../expressions/ExpressionHelper.scala | 24 +++++++++++++------ .../DeleteFromPaimonTableCommand.scala | 4 +--- .../commands/UpdatePaimonTableCommand.scala | 7 +++--- .../spark/SparkFilterConverterTest.java | 19 +++++++++++++++ 7 files changed, 58 insertions(+), 23 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java index ddaedd322b61..b600a0275268 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java @@ -71,6 +71,14 @@ public SparkFilterConverter(RowType rowType) { this.builder = new PredicateBuilder(rowType); } + public Predicate convertIgnoreFailure(Filter filter) { + try { + return convert(filter); + } catch (Exception e) { + return null; + } + } + public Predicate convert(Filter filter) { if (filter instanceof EqualTo) { EqualTo eq = (EqualTo) filter; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 6396336442c8..1c5662025723 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -189,7 +189,8 @@ private boolean execute( condition == null ? null : ExpressionUtils.convertConditionToPaimonPredicate( - condition, relation.output(), table.rowType()); + condition, relation.output(), table.rowType(), false) + .getOrElse(null); switch (bucketMode) { case FIXED: case DYNAMIC: diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala index 3f49ab0f7bdf..0efe14552afe 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala @@ -60,18 +60,16 @@ abstract class PaimonBaseScanBuilder(table: Table) val visitor = new PartitionPredicateVisitor(table.partitionKeys()) filters.foreach { filter => - try { - val predicate = converter.convert(filter) + val predicate = converter.convertIgnoreFailure(filter) + if (predicate == null) { + postScan.append(filter) + } else { pushable.append((filter, predicate)) - if (!predicate.visit(visitor)) { - postScan.append(filter) - } else { + if (predicate.visit(visitor)) { reserved.append(filter) - } - } catch { - case e: UnsupportedOperationException => - logWarning(e.getMessage) + } else { postScan.append(filter) + } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala index 3e09557d564b..8657c70ad093 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala @@ -138,18 +138,28 @@ trait ExpressionHelper extends PredicateHelper { def convertConditionToPaimonPredicate( condition: Expression, output: Seq[Attribute], - rowType: RowType): Predicate = { + rowType: RowType, + ignoreFailure: Boolean = false): Option[Predicate] = { val converter = new SparkFilterConverter(rowType) val filters = normalizeExprs(Seq(condition), output) - .flatMap(splitConjunctivePredicates(_).map { + .flatMap(splitConjunctivePredicates(_).flatMap { f => - translateFilter(f, supportNestedPredicatePushdown = true).getOrElse( - throw new RuntimeException("Exec update failed:" + - s" cannot translate expression to source filter: $f")) + val filter = translateFilter(f, supportNestedPredicatePushdown = true) + if (filter.isEmpty && !ignoreFailure) { + throw new RuntimeException( + "Exec update failed:" + + s" cannot translate expression to source filter: $f") + } + filter }) .toArray - val predicates = filters.map(converter.convert) - PredicateBuilder.and(predicates: _*) + + if (filters.isEmpty) { + None + } else { + val predicates = filters.map(converter.convertIgnoreFailure) + Some(PredicateBuilder.and(predicates: _*)) + } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index e4bf22d0fba2..84922abbc967 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -51,9 +51,7 @@ trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with Pa (None, false) } else { try { - ( - Some(convertConditionToPaimonPredicate(condition(), relation.output, table.rowType())), - false) + (convertConditionToPaimonPredicate(condition(), relation.output, table.rowType()), false) } catch { case NonFatal(_) => (None, true) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index 3bc236a868f9..1c9fef77c783 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -158,9 +158,10 @@ case class UpdatePaimonTableCommand( private def findCandidateDataSplits(): Seq[DataSplit] = { val snapshotReader = table.newSnapshotReader() - if (condition == TrueLiteral) { - val filter = convertConditionToPaimonPredicate(condition, relation.output, rowType) - snapshotReader.withFilter(filter) + if (condition != TrueLiteral) { + val filter = + convertConditionToPaimonPredicate(condition, relation.output, rowType, ignoreFailure = true) + filter.foreach(snapshotReader.withFilter) } snapshotReader.read().splits().asScala.collect { case s: DataSplit => s } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java index 2cd1726af7a8..3778d4575115 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VarCharType; import org.apache.spark.sql.sources.EqualNullSafe; import org.apache.spark.sql.sources.EqualTo; @@ -36,6 +37,8 @@ import org.apache.spark.sql.sources.IsNull; import org.apache.spark.sql.sources.LessThan; import org.apache.spark.sql.sources.LessThanOrEqual; +import org.apache.spark.sql.sources.Not; +import org.apache.spark.sql.sources.StringStartsWith; import org.junit.jupiter.api.Test; import java.sql.Date; @@ -43,10 +46,13 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowableOfType; /** Test for {@link SparkFilterConverter}. */ public class SparkFilterConverterTest { @@ -183,4 +189,17 @@ public void testDate() { assertThat(dateExpression).isEqualTo(rawExpression); assertThat(localDateExpression).isEqualTo(rawExpression); } + + @Test + public void testIgnoreFailure() { + List dataFields = new ArrayList<>(); + dataFields.add(new DataField(0, "id", new IntType())); + dataFields.add(new DataField(1, "name", new VarCharType(VarCharType.MAX_LENGTH))); + RowType rowType = new RowType(dataFields); + SparkFilterConverter converter = new SparkFilterConverter(rowType); + + Not not = Not.apply(StringStartsWith.apply("name", "paimon")); + catchThrowableOfType(() -> converter.convert(not), UnsupportedOperationException.class); + assertThat(converter.convertIgnoreFailure(not)).isNull(); + } }