Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Dec 17, 2024
1 parent d71e946 commit 1c6a37a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 172 deletions.
140 changes: 10 additions & 130 deletions paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,18 @@

package org.apache.paimon.casting;

import org.apache.paimon.data.Decimal;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.GreaterOrEqual;
import org.apache.paimon.predicate.GreaterThan;
import org.apache.paimon.predicate.In;
import org.apache.paimon.predicate.LeafFunction;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.NotEqual;
import org.apache.paimon.predicate.NotIn;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** Cast executors for input type and output type. */
Expand Down Expand Up @@ -116,138 +102,32 @@ public class CastExecutors {
}

/**
* When filter a field witch was evolved from/to a numeric type, we should carefully handle the
* precision match and overflow problem.
* If a field type is modified, pushing down a filter of it is dangerous. This method tries to
* cast the literals of filter to its original type. It only cast the literals when the CastRule
* is in whitelist. Otherwise, return Optional.empty().
*/
@Nullable
public static List<Object> safelyCastLiteralsWithNumericEvolution(
LeafPredicate predicate, DataType outputType) {
DataType inputType = predicate.type();
public static Optional<List<Object>> castLiteralsWithEvolution(
List<Object> literals, DataType inputType, DataType outputType) {
if (inputType.equalsIgnoreNullable(outputType)) {
return predicate.literals();
return Optional.of(literals);
}

List<Object> literals = predicate.literals();

CastRule<?, ?> castRule = INSTANCE.internalResolve(inputType, outputType);
if (castRule == null) {
return literals;
return Optional.empty();
}

if (castRule instanceof DecimalToDecimalCastRule) {
if (((DecimalType) inputType).getPrecision() < ((DecimalType) outputType).getPrecision()
&& containsEqualCheck(predicate)) {
// For example, alter 111.321 from DECIMAL(6, 3) to DECIMAL(5, 2).
// The query result is 111.32 which is truncated from 111.321.
// If we query with filter f = 111.32 and push down it, 111.321 will be filtered
// out mistakenly.
// But if we query with filter f > 111.32, although 111.321 will be retrieved,
// the engine will filter out it finally.
return null;
}
// Pushing down higher precision filter is always correct.
return literals;
} else if (castRule instanceof NumericPrimitiveToDecimalCastRule) {
if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) && containsEqualCheck(predicate)) {
// the reason is same as DecimalToDecimalCastRule
return null;
}
return literals.stream()
.map(literal -> (Number) literal)
.map(
literal ->
inputType.is(DataTypeFamily.INTEGER_NUMERIC)
? BigDecimal.valueOf(literal.longValue())
: BigDecimal.valueOf(literal.doubleValue()))
.map(bd -> Decimal.fromBigDecimal(bd, bd.precision(), bd.scale()))
.collect(Collectors.toList());
} else if (castRule instanceof DecimalToNumericPrimitiveCastRule) {
if (outputType.is(DataTypeFamily.INTEGER_NUMERIC)
&& (containsPartialCheck(predicate) || containsNotEqualCheck(predicate))) {
// For example, alter 111 from INT to DECIMAL(5, 2). The query result is 111.00
// If we query with filter f < 111.01 and push down it as f < 111, 111 will be
// filtered out mistakenly. Also, we shouldn't push down f <> 111.01.
// But if we query with filter f = 111.01 and push down it as f = 111, although 111
// will be retrieved, the engine will filter out it finally.
// TODO: maybe we can scale the partial filter. For example, f < 111.01 can be
// transfer to f < 112.
return null;
} else if (outputType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& containsEqualCheck(predicate)) {
// For example, alter 111.321 from DOUBLE to DECIMAL(5, 2). The query result is
// 111.32.
// If we query with filter f = 111.32 and push down it, 111.321 will be filtered
// out mistakenly.
// But if we query with filter f > 111.32 or f <> 111.32, although 111.321 will be
// retrieved, the engine will filter out it finally.
return null;
}
castLiterals(castRule, inputType, outputType, literals);
} else if (castRule instanceof NumericPrimitiveCastRule) {
if (castRule instanceof NumericPrimitiveCastRule) {
if (inputType.is(DataTypeFamily.INTEGER_NUMERIC)
&& outputType.is(DataTypeFamily.INTEGER_NUMERIC)) {
if (integerScaleLargerThan(inputType.getTypeRoot(), outputType.getTypeRoot())) {
// Pushing down higher scale integer numeric filter is always correct.
return literals;
return Optional.of(literals);
}
}

// Pushing down float filter is dangerous because the filter result is unpredictable.
// For example, (double) 0.1F in Java is 0.10000000149011612.

// Pushing down lower scale filter is also dangerous because of overflow.
// For example, alter 383 from INT to TINYINT, the query result is (byte) 383 == 127.
// If we push down filter f = 127, 383 will be filtered out which is wrong.

// So we don't push down these filters.
return null;
} else if (castRule instanceof NumericToStringCastRule
|| castRule instanceof StringToDecimalCastRule
|| castRule instanceof StringToNumericPrimitiveCastRule) {
// Pushing down filters related to STRING is dangerous because string comparison is
// different from number comparison and string literal to number might have precision
// and overflow problem.
// For example, alter '111' from STRING to INT, the query result is 111.
// If we query with filter f > 2 and push down it as f > '2', '111' will be filtered
// out mistakenly.
return null;
}

// Non numeric related cast rule
return castLiterals(castRule, inputType, outputType, literals);
}

private static List<Object> castLiterals(
CastRule<?, ?> castRule,
DataType inputType,
DataType outputType,
List<Object> literals) {
CastExecutor<Object, Objects> castExecutor =
(CastExecutor<Object, Objects>) castRule.create(inputType, outputType);
return literals.stream()
.map(l -> castExecutor == null ? l : castExecutor.cast(l))
.collect(Collectors.toList());
}

private static boolean containsEqualCheck(LeafPredicate predicate) {
LeafFunction function = predicate.function();
return function instanceof In
|| function instanceof Equal
|| function instanceof GreaterOrEqual
|| function instanceof LessOrEqual;
}

private static boolean containsPartialCheck(LeafPredicate predicate) {
LeafFunction function = predicate.function();
return function instanceof LessThan
|| function instanceof LessOrEqual
|| function instanceof GreaterThan
|| function instanceof GreaterOrEqual;
}

private static boolean containsNotEqualCheck(LeafPredicate predicate) {
LeafFunction function = predicate.function();
return function instanceof NotIn || function instanceof NotEqual;
return Optional.empty();
}

private static boolean integerScaleLargerThan(DataTypeRoot a, DataTypeRoot b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,19 +310,16 @@ public static List<Predicate> devolveDataFilters(
return Optional.empty();
}

List<Object> devolvedLiterals =
CastExecutors.safelyCastLiteralsWithNumericEvolution(
predicate, dataField.type());

return devolvedLiterals == null
? Optional.empty()
: Optional.of(
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
devolvedLiterals));
return CastExecutors.castLiteralsWithEvolution(
predicate.literals(), predicate.type(), dataField.type())
.map(
literals ->
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
literals));
};

for (Predicate predicate : filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

package org.apache.paimon.schema;

import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.IsNotNull;
import org.apache.paimon.predicate.IsNull;
import org.apache.paimon.predicate.LeafPredicate;
Expand All @@ -38,7 +36,6 @@

import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -289,29 +286,4 @@ public void testDevolveDataFilters() {
assertThat(child1.fieldName()).isEqualTo("b");
assertThat(child1.index()).isEqualTo(1);
}

@Test
public void testColumnTypeFilter() {
// alter d from INT to DECIMAL(10, 2)
// filter d = 11.01 will be devolved to d = 11
List<Predicate> predicates = new ArrayList<>();
predicates.add(
new LeafPredicate(
Equal.INSTANCE,
DataTypes.DECIMAL(10, 2),
0,
"d",
Collections.singletonList(
Decimal.fromBigDecimal(new BigDecimal("11.01"), 10, 2))));
List<Predicate> filters =
SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
assertThat(filters.size()).isEqualTo(1);

LeafPredicate child = (LeafPredicate) filters.get(0);
// Validate value 11 with index 3
assertThat(child.test(GenericRow.of(0, 0, 0, 11))).isTrue();
// Validate value 12 with index 3
assertThat(child.test(GenericRow.of(1, 0, 0, 12))).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.DataType;

import org.apache.flink.types.Row;
import org.junit.jupiter.api.TestTemplate;
Expand All @@ -32,7 +33,7 @@

import static org.assertj.core.api.Assertions.assertThat;

/** ITCase for {@link CastExecutors#safelyCastLiteralsWithNumericEvolution}. */
/** ITCase for {@link CastExecutors#castLiteralsWithEvolution(List, DataType, DataType)}. */
@ExtendWith(ParameterizedTestExtension.class)
public class FilterPushdownWithSchemaChangeITCase extends CatalogITCaseBase {

Expand Down

0 comments on commit 1c6a37a

Please sign in to comment.