From 0b684ca6ccd9d648327a06baf75df9eef3e787cc Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:10:57 +0800 Subject: [PATCH] [core] Fix predicate literals cast in filter pushdown after schema evolution (#4705) --- .../apache/paimon/casting/CastExecutors.java | 55 +++++ .../operation/AppendOnlyFileStoreScan.java | 4 +- .../operation/KeyValueFileStoreScan.java | 2 +- .../paimon/schema/SchemaEvolutionUtil.java | 41 ++- .../paimon/stats/SimpleStatsEvolutions.java | 21 +- .../paimon/utils/FormatReaderMapping.java | 2 +- .../schema/SchemaEvolutionUtilTest.java | 28 +-- .../FilterPushdownWithSchemaChangeITCase.java | 233 ++++++++++++++++++ 8 files changed, 322 insertions(+), 64 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java index 8134e0118bf8..546066d10aa3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java @@ -24,8 +24,10 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -100,6 +102,59 @@ public class CastExecutors { return IDENTITY_CAST_EXECUTOR; } + /** + * If a field type is modified, pushing down a filter of it is dangerous. This method tries to + * cast the literals of filter to its original type. It only cast the literals when the CastRule + * is in whitelist. Otherwise, return Optional.empty(). + */ + public static Optional> castLiteralsWithEvolution( + List literals, DataType predicateType, DataType dataType) { + if (predicateType.equalsIgnoreNullable(dataType)) { + return Optional.of(literals); + } + + CastRule castRule = INSTANCE.internalResolve(predicateType, dataType); + if (castRule == null) { + return Optional.empty(); + } + + if (castRule instanceof NumericPrimitiveCastRule) { + // Ignore float literals because pushing down float filter result is unpredictable. + // For example, (double) 0.1F in Java is 0.10000000149011612. + + if (predicateType.is(DataTypeFamily.INTEGER_NUMERIC) + && dataType.is(DataTypeFamily.INTEGER_NUMERIC)) { + // Ignore input scale < output scale because of overflow. + // For example, alter 383 from INT to TINYINT, the query result is (byte) 383 == + // 127. If we push down filter f = 127, 383 will be filtered out mistakenly. + + if (integerScaleLargerThan(predicateType.getTypeRoot(), dataType.getTypeRoot())) { + CastExecutor castExecutor = + (CastExecutor) castRule.create(predicateType, dataType); + List newLiterals = new ArrayList<>(literals.size()); + for (Object literal : literals) { + Number literalNumber = (Number) literal; + Number newLiteralNumber = castExecutor.cast(literalNumber); + // Ignore if any literal is overflowed. + if (newLiteralNumber.longValue() != literalNumber.longValue()) { + return Optional.empty(); + } + newLiterals.add(newLiteralNumber); + } + return Optional.of(newLiterals); + } + } + } + + return Optional.empty(); + } + + private static boolean integerScaleLargerThan(DataTypeRoot a, DataTypeRoot b) { + return (a == DataTypeRoot.SMALLINT && b == DataTypeRoot.TINYINT) + || (a == DataTypeRoot.INTEGER && b != DataTypeRoot.BIGINT) + || a == DataTypeRoot.BIGINT; + } + // Map> private final Map>> rules = new HashMap<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index d2ca5da42249..1498e08a2b20 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -109,7 +109,9 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry Predicate dataPredicate = dataFilterMapping.computeIfAbsent( entry.file().schemaId(), - id -> simpleStatsEvolutions.convertFilter(entry.file().schemaId(), filter)); + id -> + simpleStatsEvolutions.tryDevolveFilter( + entry.file().schemaId(), filter)); try (FileIndexPredicate predicate = new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 8d8c51996cfe..e39ad2e3c2e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -158,7 +158,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE schemaId2DataFilter.computeIfAbsent( entry.file().schemaId(), id -> - fieldValueStatsConverters.convertFilter( + fieldValueStatsConverters.tryDevolveFilter( entry.file().schemaId(), valueFilter)); return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index 0ae2798c29e0..cab5dcaeb8ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -276,8 +276,10 @@ public static int[][] createDataProjection( } /** - * Create predicate list from data fields. We will visit all predicate in filters, reset it's - * field index, name and type, and ignore predicate if the field is not exist. + * When pushing down filters after schema evolution, we should devolve the literals from new + * types (in dataFields) to original types (in tableFields). We will visit all predicate in + * filters, reset its field index, name and type, and ignore predicate if the field is not + * exist. * * @param tableFields the table fields * @param dataFields the underlying data fields @@ -285,7 +287,7 @@ public static int[][] createDataProjection( * @return the data filters */ @Nullable - public static List createDataFilters( + public static List devolveDataFilters( List tableFields, List dataFields, List filters) { if (filters == null) { return null; @@ -308,29 +310,16 @@ public static List createDataFilters( return Optional.empty(); } - DataType dataValueType = dataField.type().copy(true); - DataType predicateType = predicate.type().copy(true); - CastExecutor castExecutor = - dataValueType.equals(predicateType) - ? null - : (CastExecutor) - CastExecutors.resolve( - predicate.type(), dataField.type()); - // Convert value from predicate type to underlying data type which may lose - // information, for example, convert double value to int. But it doesn't matter - // because it just for predicate push down and the data will be filtered - // correctly after reading. - List literals = - predicate.literals().stream() - .map(v -> castExecutor == null ? v : castExecutor.cast(v)) - .collect(Collectors.toList()); - return Optional.of( - new LeafPredicate( - predicate.function(), - dataField.type(), - indexOf(dataField, idToDataFields), - dataField.name(), - literals)); + return CastExecutors.castLiteralsWithEvolution( + predicate.literals(), predicate.type(), dataField.type()) + .map( + literals -> + new LeafPredicate( + predicate.function(), + dataField.type(), + indexOf(dataField, idToDataFields), + dataField.name(), + literals)); }; for (Predicate predicate : filters) { diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index a0814b8c04c4..566cae9e6592 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -77,15 +77,18 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) { }); } - public Predicate convertFilter(long dataSchemaId, Predicate filter) { - return tableSchemaId == dataSchemaId - ? filter - : Objects.requireNonNull( - SchemaEvolutionUtil.createDataFilters( - schemaFields.apply(tableSchemaId), - schemaFields.apply(dataSchemaId), - Collections.singletonList(filter))) - .get(0); + @Nullable + public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) { + if (tableSchemaId == dataSchemaId) { + return filter; + } + List devolved = + Objects.requireNonNull( + SchemaEvolutionUtil.devolveDataFilters( + schemaFields.apply(tableSchemaId), + schemaFields.apply(dataSchemaId), + Collections.singletonList(filter))); + return devolved.isEmpty() ? null : devolved.get(0); } public List tableDataFields() { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index f6c6287f51b4..00554b233c59 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -290,7 +290,7 @@ private List readFilters( List dataFilters = tableSchema.id() == dataSchema.id() ? filters - : SchemaEvolutionUtil.createDataFilters( + : SchemaEvolutionUtil.devolveDataFilters( tableSchema.fields(), dataSchema.fields(), filters); // Skip pushing down partition filters to reader. diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 9d947f76d995..30d844e6c606 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -20,7 +20,6 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.predicate.Equal; import org.apache.paimon.predicate.IsNotNull; import org.apache.paimon.predicate.IsNull; import org.apache.paimon.predicate.LeafPredicate; @@ -263,7 +262,7 @@ public void testCreateDataProjection() { } @Test - public void testCreateDataFilters() { + public void testDevolveDataFilters() { List predicates = new ArrayList<>(); predicates.add( new LeafPredicate( @@ -278,7 +277,7 @@ public void testCreateDataFilters() { IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList())); List filters = - SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates); + SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); assert filters != null; assertThat(filters.size()).isEqualTo(1); @@ -287,27 +286,4 @@ public void testCreateDataFilters() { assertThat(child1.fieldName()).isEqualTo("b"); assertThat(child1.index()).isEqualTo(1); } - - @Test - public void testColumnTypeFilter() { - // (1, b, int) in data schema is updated to (1, c, double) in table2 - List predicates = new ArrayList<>(); - predicates.add( - new LeafPredicate( - Equal.INSTANCE, - DataTypes.DOUBLE(), - 0, - "c", - Collections.singletonList(1.0D))); - List filters = - SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates); - assert filters != null; - assertThat(filters.size()).isEqualTo(1); - - LeafPredicate child = (LeafPredicate) filters.get(0); - // Validate value 1 with index 1 - assertThat(child.test(GenericRow.of(0, 1))).isTrue(); - // Validate value 2 with index 1 - assertThat(child.test(GenericRow.of(1, 2))).isFalse(); - } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java new file mode 100644 index 000000000000..3b12ceabe2da --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.casting.CastExecutors; +import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; +import org.apache.paimon.testutils.junit.parameterized.Parameters; +import org.apache.paimon.types.DataType; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for {@link CastExecutors#castLiteralsWithEvolution(List, DataType, DataType)}. */ +@ExtendWith(ParameterizedTestExtension.class) +public class FilterPushdownWithSchemaChangeITCase extends CatalogITCaseBase { + + private final String fileFormat; + + public FilterPushdownWithSchemaChangeITCase(String fileFormat) { + this.fileFormat = fileFormat; + } + + @SuppressWarnings("unused") + @Parameters(name = "file-format = {0}") + public static List fileFormat() { + return Arrays.asList("parquet", "orc", "avro"); + } + + @TestTemplate + public void testDecimalToDecimal() { + // to higher precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(5, 2)" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, 111.32)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(6, 3))"); + assertThat(sql("SELECT * FROM T WHERE f < 111.321")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.321")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.320")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.321")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + + sql("DROP TABLE T"); + + // to lower precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(6, 3)" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, 111.321), (2, 111.331)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f > 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.32")) + .containsExactly(Row.of(1, new BigDecimal("111.32"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + } + + @TestTemplate + public void testNumericPrimitiveToDecimal() { + String ddl = + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(5, 2)" + + ") with (" + + " 'file.format' = '%s'" + + ")"; + + // to higher precision + sql(ddl, fileFormat); + sql("INSERT INTO T VALUES (1, 111.32)"); + sql("ALTER TABLE T MODIFY (f DOUBLE)"); + assertThat(sql("SELECT * FROM T WHERE f < 111.321")).containsExactly(Row.of(1, 111.32)); + assertThat(sql("SELECT * FROM T WHERE f = 111.321")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.320")).containsExactly(Row.of(1, 111.32)); + assertThat(sql("SELECT * FROM T WHERE f <> 111.321")).containsExactly(Row.of(1, 111.32)); + + sql("DROP TABLE T"); + + // to lower precision + sql(ddl, fileFormat); + sql("INSERT INTO T VALUES (1, 111.32), (2, 112.33)"); + sql("ALTER TABLE T MODIFY (f INT)"); + assertThat(sql("SELECT * FROM T WHERE f < 112")).containsExactly(Row.of(1, 111)); + assertThat(sql("SELECT * FROM T WHERE f > 112")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111")).containsExactly(Row.of(1, 111)); + assertThat(sql("SELECT * FROM T WHERE f <> 111")).containsExactly(Row.of(2, 112)); + } + + @TestTemplate + public void testDecimalToNumericPrimitive() { + // to higher precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f INT" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, 111)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f < 111.01")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.01")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.00")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.01")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + + sql("DROP TABLE T"); + + // to lower precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DOUBLE" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, 111.321), (2, 111.331)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f > 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.32")) + .containsExactly(Row.of(1, new BigDecimal("111.32"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + } + + @TestTemplate + public void testNumericPrimitive() { + // no checks for high scale to low scale because we don't pushdown it + + // integer to higher scale integer + sql( + "CREATE TABLE T (" + + " id INT," + + " f TINYINT" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, CAST (127 AS TINYINT))"); + sql("ALTER TABLE T MODIFY (f INT)"); + // (byte) 383 == 127 + assertThat(sql("SELECT * FROM T WHERE f < 128")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f < 383")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f = 127")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f = 383")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 127")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 383")).containsExactly(Row.of(1, 127)); + + sql("DROP TABLE T"); + + // INT to BIGINT + sql( + "CREATE TABLE T (" + + " id INT," + + " f INT" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + // (int) Integer.MAX_VALUE + 1 == Integer.MIN_VALUE -> (int) 2147483648L == -2147483648 + sql("INSERT INTO T VALUES (1, 2147483647), (2, -2147483648)"); + sql("ALTER TABLE T MODIFY (f BIGINT)"); + assertThat(sql("SELECT * FROM T WHERE f < 2147483648")) + .containsExactlyInAnyOrder(Row.of(1, 2147483647L), Row.of(2, -2147483648L)); + assertThat(sql("SELECT * FROM T WHERE f > 2147483648")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 2147483647")) + .containsExactly(Row.of(1, 2147483647L)); + assertThat(sql("SELECT * FROM T WHERE f = 2147483648")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 2147483647")) + .containsExactly(Row.of(2, -2147483648L)); + assertThat(sql("SELECT * FROM T WHERE f <> 2147483648")) + .containsExactlyInAnyOrder(Row.of(1, 2147483647L), Row.of(2, -2147483648L)); + } + + @TestTemplate + public void testNumericToString() { + // no more string related tests because we don't push down it + sql( + "CREATE TABLE T (" + + " id INT," + + " f STRING" + + ") with (" + + " 'file.format' = '%s'" + + ");", + fileFormat); + sql("INSERT INTO T VALUES (1, '1'), (2, '111')"); + sql("ALTER TABLE T MODIFY (f INT)"); + assertThat(sql("SELECT * FROM T WHERE f > 2")).containsExactly(Row.of(2, 111)); + assertThat(sql("SELECT * FROM T WHERE f = 1")).containsExactly(Row.of(1, 1)); + assertThat(sql("SELECT * FROM T WHERE f <> 1")).containsExactly(Row.of(2, 111)); + } +}