Skip to content

Commit

Permalink
[core] Fix predicate literals cast in filter pushdown after schema ev…
Browse files Browse the repository at this point in the history
…olution (#4705)
  • Loading branch information
yuzelin authored Dec 18, 2024
1 parent a057fd1 commit 0b684ca
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -100,6 +102,59 @@ public class CastExecutors {
return IDENTITY_CAST_EXECUTOR;
}

/**
* If a field type is modified, pushing down a filter of it is dangerous. This method tries to
* cast the literals of filter to its original type. It only cast the literals when the CastRule
* is in whitelist. Otherwise, return Optional.empty().
*/
public static Optional<List<Object>> castLiteralsWithEvolution(
List<Object> literals, DataType predicateType, DataType dataType) {
if (predicateType.equalsIgnoreNullable(dataType)) {
return Optional.of(literals);
}

CastRule<?, ?> castRule = INSTANCE.internalResolve(predicateType, dataType);
if (castRule == null) {
return Optional.empty();
}

if (castRule instanceof NumericPrimitiveCastRule) {
// Ignore float literals because pushing down float filter result is unpredictable.
// For example, (double) 0.1F in Java is 0.10000000149011612.

if (predicateType.is(DataTypeFamily.INTEGER_NUMERIC)
&& dataType.is(DataTypeFamily.INTEGER_NUMERIC)) {
// Ignore input scale < output scale because of overflow.
// For example, alter 383 from INT to TINYINT, the query result is (byte) 383 ==
// 127. If we push down filter f = 127, 383 will be filtered out mistakenly.

if (integerScaleLargerThan(predicateType.getTypeRoot(), dataType.getTypeRoot())) {
CastExecutor<Number, Number> castExecutor =
(CastExecutor<Number, Number>) castRule.create(predicateType, dataType);
List<Object> newLiterals = new ArrayList<>(literals.size());
for (Object literal : literals) {
Number literalNumber = (Number) literal;
Number newLiteralNumber = castExecutor.cast(literalNumber);
// Ignore if any literal is overflowed.
if (newLiteralNumber.longValue() != literalNumber.longValue()) {
return Optional.empty();
}
newLiterals.add(newLiteralNumber);
}
return Optional.of(newLiterals);
}
}
}

return Optional.empty();
}

private static boolean integerScaleLargerThan(DataTypeRoot a, DataTypeRoot b) {
return (a == DataTypeRoot.SMALLINT && b == DataTypeRoot.TINYINT)
|| (a == DataTypeRoot.INTEGER && b != DataTypeRoot.BIGINT)
|| a == DataTypeRoot.BIGINT;
}

// Map<Target family or root, Map<Input family or root, rule>>
private final Map<Object, Map<Object, CastRule<?, ?>>> rules = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry
Predicate dataPredicate =
dataFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id -> simpleStatsEvolutions.convertFilter(entry.file().schemaId(), filter));
id ->
simpleStatsEvolutions.tryDevolveFilter(
entry.file().schemaId(), filter));

try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE
schemaId2DataFilter.computeIfAbsent(
entry.file().schemaId(),
id ->
fieldValueStatsConverters.convertFilter(
fieldValueStatsConverters.tryDevolveFilter(
entry.file().schemaId(), valueFilter));
return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,18 @@ public static int[][] createDataProjection(
}

/**
* Create predicate list from data fields. We will visit all predicate in filters, reset it's
* field index, name and type, and ignore predicate if the field is not exist.
* When pushing down filters after schema evolution, we should devolve the literals from new
* types (in dataFields) to original types (in tableFields). We will visit all predicate in
* filters, reset its field index, name and type, and ignore predicate if the field is not
* exist.
*
* @param tableFields the table fields
* @param dataFields the underlying data fields
* @param filters the filters
* @return the data filters
*/
@Nullable
public static List<Predicate> createDataFilters(
public static List<Predicate> devolveDataFilters(
List<DataField> tableFields, List<DataField> dataFields, List<Predicate> filters) {
if (filters == null) {
return null;
Expand All @@ -308,29 +310,16 @@ public static List<Predicate> createDataFilters(
return Optional.empty();
}

DataType dataValueType = dataField.type().copy(true);
DataType predicateType = predicate.type().copy(true);
CastExecutor<Object, Object> castExecutor =
dataValueType.equals(predicateType)
? null
: (CastExecutor<Object, Object>)
CastExecutors.resolve(
predicate.type(), dataField.type());
// Convert value from predicate type to underlying data type which may lose
// information, for example, convert double value to int. But it doesn't matter
// because it just for predicate push down and the data will be filtered
// correctly after reading.
List<Object> literals =
predicate.literals().stream()
.map(v -> castExecutor == null ? v : castExecutor.cast(v))
.collect(Collectors.toList());
return Optional.of(
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
literals));
return CastExecutors.castLiteralsWithEvolution(
predicate.literals(), predicate.type(), dataField.type())
.map(
literals ->
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
literals));
};

for (Predicate predicate : filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) {
});
}

public Predicate convertFilter(long dataSchemaId, Predicate filter) {
return tableSchemaId == dataSchemaId
? filter
: Objects.requireNonNull(
SchemaEvolutionUtil.createDataFilters(
schemaFields.apply(tableSchemaId),
schemaFields.apply(dataSchemaId),
Collections.singletonList(filter)))
.get(0);
@Nullable
public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) {
if (tableSchemaId == dataSchemaId) {
return filter;
}
List<Predicate> devolved =
Objects.requireNonNull(
SchemaEvolutionUtil.devolveDataFilters(
schemaFields.apply(tableSchemaId),
schemaFields.apply(dataSchemaId),
Collections.singletonList(filter)));
return devolved.isEmpty() ? null : devolved.get(0);
}

public List<DataField> tableDataFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private List<Predicate> readFilters(
List<Predicate> dataFilters =
tableSchema.id() == dataSchema.id()
? filters
: SchemaEvolutionUtil.createDataFilters(
: SchemaEvolutionUtil.devolveDataFilters(
tableSchema.fields(), dataSchema.fields(), filters);

// Skip pushing down partition filters to reader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.IsNotNull;
import org.apache.paimon.predicate.IsNull;
import org.apache.paimon.predicate.LeafPredicate;
Expand Down Expand Up @@ -263,7 +262,7 @@ public void testCreateDataProjection() {
}

@Test
public void testCreateDataFilters() {
public void testDevolveDataFilters() {
List<Predicate> predicates = new ArrayList<>();
predicates.add(
new LeafPredicate(
Expand All @@ -278,7 +277,7 @@ public void testCreateDataFilters() {
IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList()));

List<Predicate> filters =
SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates);
SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
assertThat(filters.size()).isEqualTo(1);

Expand All @@ -287,27 +286,4 @@ public void testCreateDataFilters() {
assertThat(child1.fieldName()).isEqualTo("b");
assertThat(child1.index()).isEqualTo(1);
}

@Test
public void testColumnTypeFilter() {
// (1, b, int) in data schema is updated to (1, c, double) in table2
List<Predicate> predicates = new ArrayList<>();
predicates.add(
new LeafPredicate(
Equal.INSTANCE,
DataTypes.DOUBLE(),
0,
"c",
Collections.singletonList(1.0D)));
List<Predicate> filters =
SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
assertThat(filters.size()).isEqualTo(1);

LeafPredicate child = (LeafPredicate) filters.get(0);
// Validate value 1 with index 1
assertThat(child.test(GenericRow.of(0, 1))).isTrue();
// Validate value 2 with index 1
assertThat(child.test(GenericRow.of(1, 2))).isFalse();
}
}
Loading

0 comments on commit 0b684ca

Please sign in to comment.