diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/AlwaysFalse.java b/paimon-common/src/main/java/org/apache/paimon/predicate/AlwaysFalse.java new file mode 100644 index 000000000000..7b4d7fa87ecd --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/AlwaysFalse.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.types.DataType; + +import java.util.List; +import java.util.Optional; + +/** A {@link LeafFunction} that always eval to `false`. */ +public class AlwaysFalse extends LeafFunction { + + private static final long serialVersionUID = 1L; + + public static final AlwaysFalse INSTANCE = new AlwaysFalse(); + + private AlwaysFalse() {} + + @Override + public boolean test() { + return false; + } + + @Override + public boolean test(DataType type, Object field, List literals) { + return false; + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List literals) { + return false; + } + + @Override + public Optional negate() { + return Optional.of(AlwaysTrue.INSTANCE); + } + + @Override + public T visit(FunctionVisitor visitor, FieldRef fieldRef, List literals) { + throw new UnsupportedOperationException(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/AlwaysTrue.java b/paimon-common/src/main/java/org/apache/paimon/predicate/AlwaysTrue.java new file mode 100644 index 000000000000..248b2d8bced1 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/AlwaysTrue.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.types.DataType; + +import java.util.List; +import java.util.Optional; + +/** A {@link LeafFunction} that always eval to `true`. */ +public class AlwaysTrue extends LeafFunction { + + private static final long serialVersionUID = 1L; + + public static final AlwaysTrue INSTANCE = new AlwaysTrue(); + + private AlwaysTrue() {} + + @Override + public boolean test() { + return true; + } + + @Override + public boolean test(DataType type, Object field, List literals) { + return true; + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List literals) { + return true; + } + + @Override + public Optional negate() { + return Optional.of(AlwaysFalse.INSTANCE); + } + + @Override + public T visit(FunctionVisitor visitor, FieldRef fieldRef, List literals) { + throw new UnsupportedOperationException(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java index 7a1e3d0f5f51..27ea11071e5b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java @@ -27,6 +27,10 @@ /** Function to test a field with literals. */ public abstract class LeafFunction implements Serializable { + public boolean test() { + throw new UnsupportedOperationException(); + } + public abstract boolean test(DataType type, Object field, List literals); public abstract boolean test( diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java index 5267f7069f90..2ed68335fd60 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java @@ -91,12 +91,18 @@ public LeafPredicate copyWithNewIndex(int fieldIndex) { @Override public boolean test(InternalRow row) { + if (function.equals(AlwaysTrue.INSTANCE) || function.equals(AlwaysFalse.INSTANCE)) { + return function.test(); + } return function.test(type, get(row, fieldIndex, type), literals); } @Override public boolean test( long rowCount, InternalRow minValues, InternalRow maxValues, InternalArray nullCounts) { + if (function.equals(AlwaysTrue.INSTANCE) || function.equals(AlwaysFalse.INSTANCE)) { + return function.test(); + } Object min = get(minValues, fieldIndex, type); Object max = get(maxValues, fieldIndex, type); Long nullCount = nullCounts.isNullAt(fieldIndex) ? null : nullCounts.getLong(fieldIndex); diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java index ee84d59d85b4..7ee43c1f2636 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java @@ -71,6 +71,14 @@ public int indexOf(String field) { return fieldNames.indexOf(field); } + public Predicate alwaysTrue() { + return new LeafPredicate(AlwaysTrue.INSTANCE, null, -1, null, Collections.emptyList()); + } + + public Predicate alwaysFalse() { + return new LeafPredicate(AlwaysFalse.INSTANCE, null, -1, null, Collections.emptyList()); + } + public Predicate equal(int idx, Object literal) { return leaf(Equal.INSTANCE, idx, literal); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java index 4f7cee52ce43..62e1ec429967 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java @@ -23,7 +23,10 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; +import org.apache.spark.sql.sources.AlwaysFalse; +import org.apache.spark.sql.sources.AlwaysTrue; import org.apache.spark.sql.sources.And; +import org.apache.spark.sql.sources.EqualNullSafe; import org.apache.spark.sql.sources.EqualTo; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.GreaterThan; @@ -48,7 +51,10 @@ public class SparkFilterConverter { public static final List SUPPORT_FILTERS = Arrays.asList( + "AlwaysTrue", + "AlwaysFalse", "EqualTo", + "EqualNullSafe", "GreaterThan", "GreaterThanOrEqual", "LessThan", @@ -70,12 +76,25 @@ public SparkFilterConverter(RowType rowType) { } public Predicate convert(Filter filter) { - if (filter instanceof EqualTo) { + if (filter instanceof AlwaysTrue) { + return builder.alwaysTrue(); + } else if (filter instanceof AlwaysFalse) { + return builder.alwaysFalse(); + } else if (filter instanceof EqualTo) { EqualTo eq = (EqualTo) filter; // TODO deal with isNaN int index = fieldIndex(eq.attribute()); Object literal = convertLiteral(index, eq.value()); return builder.equal(index, literal); + } else if (filter instanceof EqualNullSafe) { + EqualNullSafe eq = (EqualNullSafe) filter; + if (eq.value() == null) { + return builder.isNull(fieldIndex(eq.attribute())); + } else { + int index = fieldIndex(eq.attribute()); + Object literal = convertLiteral(index, eq.value()); + return builder.equal(index, literal); + } } else if (filter instanceof GreaterThan) { GreaterThan gt = (GreaterThan) filter; int index = fieldIndex(gt.attribute()); @@ -123,16 +142,12 @@ public Predicate convert(Filter filter) { Object literal = convertLiteral(index, startsWith.value()); return builder.startsWith(index, literal); } - - // TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe throw new UnsupportedOperationException( filter + " is unsupported. Support Filters: " + SUPPORT_FILTERS); } public Object convertLiteral(String field, Object value) { - int index = fieldIndex(field); - DataType type = rowType.getTypeAt(index); - return convertJavaObject(type, value); + return convertLiteral(fieldIndex(field), value); } private int fieldIndex(String field) { diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java index 4f7e4643376b..506fd92a4743 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java @@ -27,6 +27,9 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; +import org.apache.spark.sql.sources.AlwaysFalse; +import org.apache.spark.sql.sources.AlwaysTrue; +import org.apache.spark.sql.sources.EqualNullSafe; import org.apache.spark.sql.sources.EqualTo; import org.apache.spark.sql.sources.GreaterThan; import org.apache.spark.sql.sources.GreaterThanOrEqual; @@ -57,6 +60,16 @@ public void testAll() { SparkFilterConverter converter = new SparkFilterConverter(rowType); PredicateBuilder builder = new PredicateBuilder(rowType); + AlwaysTrue alwaysTrue = AlwaysTrue.apply(); + Predicate expectedAlwaysTrue = builder.alwaysTrue(); + Predicate actualAlwaysTrue = converter.convert(alwaysTrue); + assertThat(actualAlwaysTrue).isEqualTo(expectedAlwaysTrue); + + AlwaysFalse alwaysFalse = AlwaysFalse.apply(); + Predicate expectedAlwaysFalse = builder.alwaysFalse(); + Predicate actualAlwaysFalse = converter.convert(alwaysFalse); + assertThat(actualAlwaysFalse).isEqualTo(expectedAlwaysFalse); + String field = "id"; IsNull isNull = IsNull.apply(field); Predicate expectedIsNull = builder.isNull(0); @@ -118,6 +131,16 @@ public void testAll() { Predicate actualEqNull = converter.convert(eqNull); assertThat(actualEqNull).isEqualTo(expectedEqNull); + EqualNullSafe eqSafe = EqualNullSafe.apply(field, 1); + Predicate expectedEqSafe = builder.equal(0, 1); + Predicate actualEqSafe = converter.convert(eqSafe); + assertThat(actualEqSafe).isEqualTo(expectedEqSafe); + + EqualNullSafe eqNullSafe = EqualNullSafe.apply(field, null); + Predicate expectEqNullSafe = builder.isNull(0); + Predicate actualEqNullSafe = converter.convert(eqNullSafe); + assertThat(actualEqNullSafe).isEqualTo(expectEqNullSafe); + In in = In.apply(field, new Object[] {1, null, 2}); Predicate expectedIn = builder.in(0, Arrays.asList(1, null, 2)); Predicate actualIn = converter.convert(in);