From e8ef382267c63825d4ccd6bb9279636ff456e0a0 Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 2 Apr 2024 17:56:44 +0800 Subject: [PATCH] [spark][followup] solve comments --- .../paimon/spark/SparkFilterConverter.java | 8 ++++++++ .../paimon/spark/PaimonBaseScanBuilder.scala | 16 +++++++--------- .../expressions/ExpressionHelper.scala | 2 +- .../spark/SparkFilterConverterTest.java | 19 +++++++++++++++++++ 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java index ddaedd322b61..b600a0275268 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java @@ -71,6 +71,14 @@ public SparkFilterConverter(RowType rowType) { this.builder = new PredicateBuilder(rowType); } + public Predicate convertIgnoreFailure(Filter filter) { + try { + return convert(filter); + } catch (Exception e) { + return null; + } + } + public Predicate convert(Filter filter) { if (filter instanceof EqualTo) { EqualTo eq = (EqualTo) filter; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala index 3f49ab0f7bdf..0efe14552afe 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala @@ -60,18 +60,16 @@ abstract class PaimonBaseScanBuilder(table: Table) val visitor = new PartitionPredicateVisitor(table.partitionKeys()) filters.foreach { filter => - try { - val predicate = converter.convert(filter) + val predicate = converter.convertIgnoreFailure(filter) + if (predicate == null) { + postScan.append(filter) + } else { pushable.append((filter, predicate)) - if (!predicate.visit(visitor)) { - postScan.append(filter) - } else { + if (predicate.visit(visitor)) { reserved.append(filter) - } - } catch { - case e: UnsupportedOperationException => - logWarning(e.getMessage) + } else { postScan.append(filter) + } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala index 4510fc315a55..8657c70ad093 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala @@ -157,7 +157,7 @@ trait ExpressionHelper extends PredicateHelper { if (filters.isEmpty) { None } else { - val predicates = filters.map(converter.convert) + val predicates = filters.map(converter.convertIgnoreFailure) Some(PredicateBuilder.and(predicates: _*)) } } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java index 2cd1726af7a8..3778d4575115 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VarCharType; import org.apache.spark.sql.sources.EqualNullSafe; import org.apache.spark.sql.sources.EqualTo; @@ -36,6 +37,8 @@ import org.apache.spark.sql.sources.IsNull; import org.apache.spark.sql.sources.LessThan; import org.apache.spark.sql.sources.LessThanOrEqual; +import org.apache.spark.sql.sources.Not; +import org.apache.spark.sql.sources.StringStartsWith; import org.junit.jupiter.api.Test; import java.sql.Date; @@ -43,10 +46,13 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowableOfType; /** Test for {@link SparkFilterConverter}. */ public class SparkFilterConverterTest { @@ -183,4 +189,17 @@ public void testDate() { assertThat(dateExpression).isEqualTo(rawExpression); assertThat(localDateExpression).isEqualTo(rawExpression); } + + @Test + public void testIgnoreFailure() { + List dataFields = new ArrayList<>(); + dataFields.add(new DataField(0, "id", new IntType())); + dataFields.add(new DataField(1, "name", new VarCharType(VarCharType.MAX_LENGTH))); + RowType rowType = new RowType(dataFields); + SparkFilterConverter converter = new SparkFilterConverter(rowType); + + Not not = Not.apply(StringStartsWith.apply("name", "paimon")); + catchThrowableOfType(() -> converter.convert(not), UnsupportedOperationException.class); + assertThat(converter.convertIgnoreFailure(not)).isNull(); + } }