From e1fa0b055275697b15e317460d5f670363b5c2ca Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 23 Oct 2024 18:31:27 +0800 Subject: [PATCH] [format] Fix ParquetFilters cannot handle tinyint and smallint correctly (#4365) --- .../apache/paimon/flink/PredicateITCase.java | 38 +++++++++++++++++++ .../filter2/predicate/ParquetFilters.java | 5 +++ 2 files changed, 43 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateITCase.java index f50f7db60f02..0668c17eea7c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateITCase.java @@ -21,6 +21,8 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; +import java.util.concurrent.ThreadLocalRandom; + import static org.assertj.core.api.Assertions.assertThat; /** Predicate ITCase. */ @@ -50,6 +52,42 @@ public void testAppendFilterBucket() throws Exception { innerTestAllFields(); } + @Test + public void testIntegerFilter() { + int rand = ThreadLocalRandom.current().nextInt(3); + String fileFormat; + if (rand == 0) { + fileFormat = "avro"; + } else if (rand == 1) { + fileFormat = "parquet"; + } else { + fileFormat = "orc"; + } + + sql( + "CREATE TABLE T (" + + "a TINYINT," + + "b SMALLINT," + + "c INT," + + "d BIGINT" + + ") WITH (" + + "'file.format' = '%s'" + + ")", + fileFormat); + sql( + "INSERT INTO T VALUES (CAST (1 AS TINYINT), CAST (1 AS SMALLINT), 1, 1), " + + "(CAST (2 AS TINYINT), CAST (2 AS SMALLINT), 2, 2)"); + + Row expectedResult = Row.of((byte) 1, (short) 1, 1, 1L); + assertThat(sql("SELECT * FROM T WHERE a = CAST (1 AS TINYINT)")) + .containsExactly(expectedResult); + assertThat(sql("SELECT * FROM T WHERE b = CAST (1 AS SMALLINT)")) + .containsExactly(expectedResult); + assertThat(sql("SELECT * FROM T WHERE c = 1")).containsExactly(expectedResult); + assertThat(sql("SELECT * FROM T WHERE d = CAST (1 AS BIGINT)")) + .containsExactly(expectedResult); + } + private void writeRecords() throws Exception { sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)"); } diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index cacc241fd24b..96cf2fe726cf 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -172,6 +172,11 @@ private static Comparable toParquetObject(Object value) { } if (value instanceof Number) { + if (value instanceof Byte) { + return ((Byte) value).intValue(); + } else if (value instanceof Short) { + return ((Short) value).intValue(); + } return (Comparable) value; } else if (value instanceof String) { return Binary.fromString((String) value);