From af4294fe64d70f327c11333674771e676e6d6e48 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Wed, 26 Jun 2024 14:29:32 +0800 Subject: [PATCH] [core] Skip pushing down partition filter to file reader (#3608) --- .../paimon/predicate/PredicateBuilder.java | 10 +++++++ .../paimon/io/KeyValueFileReaderFactory.java | 9 +++++- .../paimon/operation/RawFileSplitRead.java | 7 ++++- .../procedure/MigrateTableProcedureTest.scala | 30 +++++++++++++++++++ 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java index acb861b1cd58..385775736c2c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java @@ -357,6 +357,16 @@ public static boolean containsFields(Predicate predicate, Set fields) { } } + public static List excludePredicateWithFields( + @Nullable List predicates, Set fields) { + if (predicates == null || predicates.isEmpty() || fields.isEmpty()) { + return predicates; + } + return predicates.stream() + .filter(f -> !containsFields(f, fields)) + .collect(Collectors.toList()); + } + @Nullable public static Predicate partition( Map map, RowType rowType, String defaultPartValue) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 21e8c52118b9..af9b31dd3f6a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -51,11 +51,14 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; +import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; + /** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */ public class KeyValueFileReaderFactory implements FileReaderFactory { @@ -375,6 +378,10 @@ public BulkFormatMapping build( ? filters : SchemaEvolutionUtil.createDataFilters( tableSchema.fields(), dataSchema.fields(), filters); + // Skip pushing down partition filters to reader + List nonPartitionFilters = + excludePredicateWithFields( + dataFilters, new HashSet<>(dataSchema.partitionKeys())); Pair partitionPair = null; if (!dataSchema.partitionKeys().isEmpty()) { @@ -397,7 +404,7 @@ public BulkFormatMapping build( partitionPair, formatDiscover .discover(formatIdentifier) - .createReaderFactory(projectedRowType, dataFilters)); + .createReaderFactory(projectedRowType, nonPartitionFilters)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index efed168afe24..7e6e91731d72 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -57,10 +57,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; /** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */ @@ -181,6 +183,9 @@ private RawFileBulkFormatMapping createBulkFormatMapping(FormatKey key) { ? filters : SchemaEvolutionUtil.createDataFilters( tableSchema.fields(), dataSchema.fields(), filters); + // Skip pushing down partition filters to reader + List nonPartitionFilters = + excludePredicateWithFields(dataFilters, new HashSet<>(dataSchema.partitionKeys())); Pair partitionPair = null; if (!dataSchema.partitionKeys().isEmpty()) { @@ -205,7 +210,7 @@ private RawFileBulkFormatMapping createBulkFormatMapping(FormatKey key) { partitionPair, formatDiscover .discover(key.format) - .createReaderFactory(projectedRowType, dataFilters), + .createReaderFactory(projectedRowType, nonPartitionFilters), dataSchema, dataFilters); } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala index 710d4dbfa8d3..538de53dd62f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala @@ -141,4 +141,34 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase { .hasMessageContaining("Hive migrator only support unaware-bucket target table") } } + + test(s"Paimon migrate table procedure: select migrate table with partition filter") { + Seq("parquet", "orc", "avro").foreach( + format => { + withTable("migrate_tbl") { + spark.sql(s""" + |CREATE TABLE migrate_tbl (id STRING, name STRING, pt INT) + |USING $format + |PARTITIONED BY (pt) + |""".stripMargin) + + spark.sql(s"INSERT INTO migrate_tbl VALUES ('1', 'a', 1), ('2', 'b', 2)") + + spark.sql( + s"""CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.migrate_tbl', + |options => 'file.format=$format') + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM migrate_tbl WHERE pt = 1 ORDER BY id"), + Row("1", "a", 1) :: Nil) + checkAnswer( + spark.sql(s"SELECT * FROM migrate_tbl WHERE pt IS NOT null ORDER BY id"), + Row("1", "a", 1) :: Row("2", "b", 2) :: Nil) + checkAnswer( + spark.sql(s"SELECT * FROM migrate_tbl WHERE name LIKE 'a%' OR pt IS null ORDER BY id"), + Row("1", "a", 1) :: Nil) + } + }) + } }