Skip to content

Commit

Permalink
[spark][followup] solve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Apr 2, 2024
1 parent 5fbeca5 commit e8ef382
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public SparkFilterConverter(RowType rowType) {
this.builder = new PredicateBuilder(rowType);
}

public Predicate convertIgnoreFailure(Filter filter) {
try {
return convert(filter);
} catch (Exception e) {
return null;
}
}

public Predicate convert(Filter filter) {
if (filter instanceof EqualTo) {
EqualTo eq = (EqualTo) filter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,16 @@ abstract class PaimonBaseScanBuilder(table: Table)
val visitor = new PartitionPredicateVisitor(table.partitionKeys())
filters.foreach {
filter =>
try {
val predicate = converter.convert(filter)
val predicate = converter.convertIgnoreFailure(filter)
if (predicate == null) {
postScan.append(filter)
} else {
pushable.append((filter, predicate))
if (!predicate.visit(visitor)) {
postScan.append(filter)
} else {
if (predicate.visit(visitor)) {
reserved.append(filter)
}
} catch {
case e: UnsupportedOperationException =>
logWarning(e.getMessage)
} else {
postScan.append(filter)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ trait ExpressionHelper extends PredicateHelper {
if (filters.isEmpty) {
None
} else {
val predicates = filters.map(converter.convert)
val predicates = filters.map(converter.convertIgnoreFailure)
Some(PredicateBuilder.and(predicates: _*))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;

import org.apache.spark.sql.sources.EqualNullSafe;
import org.apache.spark.sql.sources.EqualTo;
Expand All @@ -36,17 +37,22 @@
import org.apache.spark.sql.sources.IsNull;
import org.apache.spark.sql.sources.LessThan;
import org.apache.spark.sql.sources.LessThanOrEqual;
import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.StringStartsWith;
import org.junit.jupiter.api.Test;

import java.sql.Date;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;

/** Test for {@link SparkFilterConverter}. */
public class SparkFilterConverterTest {
Expand Down Expand Up @@ -183,4 +189,17 @@ public void testDate() {
assertThat(dateExpression).isEqualTo(rawExpression);
assertThat(localDateExpression).isEqualTo(rawExpression);
}

@Test
public void testIgnoreFailure() {
List<DataField> dataFields = new ArrayList<>();
dataFields.add(new DataField(0, "id", new IntType()));
dataFields.add(new DataField(1, "name", new VarCharType(VarCharType.MAX_LENGTH)));
RowType rowType = new RowType(dataFields);
SparkFilterConverter converter = new SparkFilterConverter(rowType);

Not not = Not.apply(StringStartsWith.apply("name", "paimon"));
catchThrowableOfType(() -> converter.convert(not), UnsupportedOperationException.class);
assertThat(converter.convertIgnoreFailure(not)).isNull();
}
}

0 comments on commit e8ef382

Please sign in to comment.