diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java index 2cb7319faabc..b0d9524adc17 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -118,11 +119,33 @@ public static Optional> castLiteralsWithEvolution( } if (castRule instanceof NumericPrimitiveCastRule) { + // Ignore float literals because pushing down float filter result is unpredictable. + // For example, (double) 0.1F in Java is 0.10000000149011612. + if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) && outputType.is(DataTypeFamily.INTEGER_NUMERIC)) { + // Ignore input scale < output scale because of overflow. + // For example, alter 383 from INT to TINYINT, the query result is (byte) 383 == + // 127. If we push down filter f = 127, 383 will be filtered out mistakenly. + if (integerScaleLargerThan(inputType.getTypeRoot(), outputType.getTypeRoot())) { - // Pushing down higher scale integer numeric filter is always correct. - return Optional.of(literals); + if (inputType.getTypeRoot() != DataTypeRoot.BIGINT) { + return Optional.of(literals); + } + + // Parquet filter Int comparator cannot handle long value. + // See org.apache.parquet.schema.PrimitiveType. + // So ignore filter if long literal is out of int scale. + List newLiterals = new ArrayList<>(literals.size()); + for (Object literal : literals) { + long originalValue = (long) literal; + int newValue = (int) originalValue; + if (originalValue != newValue) { + return Optional.empty(); + } + newLiterals.add(newValue); + } + return Optional.of(newLiterals); } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java index 277c68746d58..3b12ceabe2da 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -186,6 +186,31 @@ public void testNumericPrimitive() { assertThat(sql("SELECT * FROM T WHERE f = 383")).isEmpty(); assertThat(sql("SELECT * FROM T WHERE f <> 127")).isEmpty(); assertThat(sql("SELECT * FROM T WHERE f <> 383")).containsExactly(Row.of(1, 127)); + + sql("DROP TABLE T"); + + // INT to BIGINT + sql( + "CREATE TABLE T (" + + " id INT," + + " f INT" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + // (int) Integer.MAX_VALUE + 1 == Integer.MIN_VALUE -> (int) 2147483648L == -2147483648 + sql("INSERT INTO T VALUES (1, 2147483647), (2, -2147483648)"); + sql("ALTER TABLE T MODIFY (f BIGINT)"); + assertThat(sql("SELECT * FROM T WHERE f < 2147483648")) + .containsExactlyInAnyOrder(Row.of(1, 2147483647L), Row.of(2, -2147483648L)); + assertThat(sql("SELECT * FROM T WHERE f > 2147483648")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 2147483647")) + .containsExactly(Row.of(1, 2147483647L)); + assertThat(sql("SELECT * FROM T WHERE f = 2147483648")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 2147483647")) + .containsExactly(Row.of(2, -2147483648L)); + assertThat(sql("SELECT * FROM T WHERE f <> 2147483648")) + .containsExactlyInAnyOrder(Row.of(1, 2147483647L), Row.of(2, -2147483648L)); } @TestTemplate