Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Feb 19, 2024
1 parent a0062d5 commit bb19fac
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.predicate;

import org.apache.paimon.types.DataType;

import java.util.List;
import java.util.Optional;

/** A {@link LeafFunction} that always eval to `false`. */
public class AlwaysFalse extends LeafFunction {

private static final long serialVersionUID = 1L;

public static final AlwaysFalse INSTANCE = new AlwaysFalse();

private AlwaysFalse() {}

@Override
public boolean test() {
return false;
}

@Override
public boolean test(DataType type, Object field, List<Object> literals) {
return false;
}

@Override
public boolean test(
DataType type,
long rowCount,
Object min,
Object max,
Long nullCount,
List<Object> literals) {
return false;
}

@Override
public Optional<LeafFunction> negate() {
return Optional.of(AlwaysTrue.INSTANCE);
}

@Override
public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.predicate;

import org.apache.paimon.types.DataType;

import java.util.List;
import java.util.Optional;

/** A {@link LeafFunction} that always eval to `true`. */
public class AlwaysTrue extends LeafFunction {

private static final long serialVersionUID = 1L;

public static final AlwaysTrue INSTANCE = new AlwaysTrue();

private AlwaysTrue() {}

@Override
public boolean test() {
return true;
}

@Override
public boolean test(DataType type, Object field, List<Object> literals) {
return true;
}

@Override
public boolean test(
DataType type,
long rowCount,
Object min,
Object max,
Long nullCount,
List<Object> literals) {
return true;
}

@Override
public Optional<LeafFunction> negate() {
return Optional.of(AlwaysFalse.INSTANCE);
}

@Override
public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> literals) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
/** Function to test a field with literals. */
public abstract class LeafFunction implements Serializable {

public boolean test() {
throw new UnsupportedOperationException();
}

public abstract boolean test(DataType type, Object field, List<Object> literals);

public abstract boolean test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,18 @@ public LeafPredicate copyWithNewIndex(int fieldIndex) {

@Override
public boolean test(InternalRow row) {
if (function.equals(AlwaysTrue.INSTANCE) || function.equals(AlwaysFalse.INSTANCE)) {
return function.test();
}
return function.test(type, get(row, fieldIndex, type), literals);
}

@Override
public boolean test(
long rowCount, InternalRow minValues, InternalRow maxValues, InternalArray nullCounts) {
if (function.equals(AlwaysTrue.INSTANCE) || function.equals(AlwaysFalse.INSTANCE)) {
return function.test();
}
Object min = get(minValues, fieldIndex, type);
Object max = get(maxValues, fieldIndex, type);
Long nullCount = nullCounts.isNullAt(fieldIndex) ? null : nullCounts.getLong(fieldIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public int indexOf(String field) {
return fieldNames.indexOf(field);
}

public Predicate alwaysTrue() {
return new LeafPredicate(AlwaysTrue.INSTANCE, null, -1, null, Collections.emptyList());
}

public Predicate alwaysFalse() {
return new LeafPredicate(AlwaysFalse.INSTANCE, null, -1, null, Collections.emptyList());
}

public Predicate equal(int idx, Object literal) {
return leaf(Equal.INSTANCE, idx, literal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

import org.apache.spark.sql.sources.AlwaysFalse;
import org.apache.spark.sql.sources.AlwaysTrue;
import org.apache.spark.sql.sources.And;
import org.apache.spark.sql.sources.EqualNullSafe;
import org.apache.spark.sql.sources.EqualTo;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.GreaterThan;
Expand All @@ -48,7 +51,10 @@ public class SparkFilterConverter {

public static final List<String> SUPPORT_FILTERS =
Arrays.asList(
"AlwaysTrue",
"AlwaysFalse",
"EqualTo",
"EqualNullSafe",
"GreaterThan",
"GreaterThanOrEqual",
"LessThan",
Expand All @@ -70,12 +76,25 @@ public SparkFilterConverter(RowType rowType) {
}

public Predicate convert(Filter filter) {
if (filter instanceof EqualTo) {
if (filter instanceof AlwaysTrue) {
return builder.alwaysTrue();
} else if (filter instanceof AlwaysFalse) {
return builder.alwaysFalse();
} else if (filter instanceof EqualTo) {
EqualTo eq = (EqualTo) filter;
// TODO deal with isNaN
int index = fieldIndex(eq.attribute());
Object literal = convertLiteral(index, eq.value());
return builder.equal(index, literal);
} else if (filter instanceof EqualNullSafe) {
EqualNullSafe eq = (EqualNullSafe) filter;
if (eq.value() == null) {
return builder.isNull(fieldIndex(eq.attribute()));
} else {
int index = fieldIndex(eq.attribute());
Object literal = convertLiteral(index, eq.value());
return builder.equal(index, literal);
}
} else if (filter instanceof GreaterThan) {
GreaterThan gt = (GreaterThan) filter;
int index = fieldIndex(gt.attribute());
Expand Down Expand Up @@ -123,16 +142,12 @@ public Predicate convert(Filter filter) {
Object literal = convertLiteral(index, startsWith.value());
return builder.startsWith(index, literal);
}

// TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
throw new UnsupportedOperationException(
filter + " is unsupported. Support Filters: " + SUPPORT_FILTERS);
}

public Object convertLiteral(String field, Object value) {
int index = fieldIndex(field);
DataType type = rowType.getTypeAt(index);
return convertJavaObject(type, value);
return convertLiteral(fieldIndex(field), value);
}

private int fieldIndex(String field) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;

import org.apache.spark.sql.sources.AlwaysFalse;
import org.apache.spark.sql.sources.AlwaysTrue;
import org.apache.spark.sql.sources.EqualNullSafe;
import org.apache.spark.sql.sources.EqualTo;
import org.apache.spark.sql.sources.GreaterThan;
import org.apache.spark.sql.sources.GreaterThanOrEqual;
Expand Down Expand Up @@ -57,6 +60,16 @@ public void testAll() {
SparkFilterConverter converter = new SparkFilterConverter(rowType);
PredicateBuilder builder = new PredicateBuilder(rowType);

AlwaysTrue alwaysTrue = AlwaysTrue.apply();
Predicate expectedAlwaysTrue = builder.alwaysTrue();
Predicate actualAlwaysTrue = converter.convert(alwaysTrue);
assertThat(actualAlwaysTrue).isEqualTo(expectedAlwaysTrue);

AlwaysFalse alwaysFalse = AlwaysFalse.apply();
Predicate expectedAlwaysFalse = builder.alwaysFalse();
Predicate actualAlwaysFalse = converter.convert(alwaysFalse);
assertThat(actualAlwaysFalse).isEqualTo(expectedAlwaysFalse);

String field = "id";
IsNull isNull = IsNull.apply(field);
Predicate expectedIsNull = builder.isNull(0);
Expand Down Expand Up @@ -118,6 +131,16 @@ public void testAll() {
Predicate actualEqNull = converter.convert(eqNull);
assertThat(actualEqNull).isEqualTo(expectedEqNull);

EqualNullSafe eqSafe = EqualNullSafe.apply(field, 1);
Predicate expectedEqSafe = builder.equal(0, 1);
Predicate actualEqSafe = converter.convert(eqSafe);
assertThat(actualEqSafe).isEqualTo(expectedEqSafe);

EqualNullSafe eqNullSafe = EqualNullSafe.apply(field, null);
Predicate expectEqNullSafe = builder.isNull(0);
Predicate actualEqNullSafe = converter.convert(eqNullSafe);
assertThat(actualEqNullSafe).isEqualTo(expectEqNullSafe);

In in = In.apply(field, new Object[] {1, null, 2});
Predicate expectedIn = builder.in(0, Arrays.asList(1, null, 2));
Predicate actualIn = converter.convert(in);
Expand Down

0 comments on commit bb19fac

Please sign in to comment.