Skip to content

Commit

Permalink
[core] Skip pushing down partition filter to file reader (apache#3608)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jun 26, 2024
1 parent c60eb5e commit af4294f
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,16 @@ public static boolean containsFields(Predicate predicate, Set<String> fields) {
}
}

public static List<Predicate> excludePredicateWithFields(
@Nullable List<Predicate> predicates, Set<String> fields) {
if (predicates == null || predicates.isEmpty() || fields.isEmpty()) {
return predicates;
}
return predicates.stream()
.filter(f -> !containsFields(f, fields))
.collect(Collectors.toList());
}

@Nullable
public static Predicate partition(
Map<String, String> map, RowType rowType, String defaultPartValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;

/** Factory to create {@link RecordReader}s for reading {@link KeyValue} files. */
public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {

Expand Down Expand Up @@ -375,6 +378,10 @@ public BulkFormatMapping build(
? filters
: SchemaEvolutionUtil.createDataFilters(
tableSchema.fields(), dataSchema.fields(), filters);
// Skip pushing down partition filters to reader
List<Predicate> nonPartitionFilters =
excludePredicateWithFields(
dataFilters, new HashSet<>(dataSchema.partitionKeys()));

Pair<int[], RowType> partitionPair = null;
if (!dataSchema.partitionKeys().isEmpty()) {
Expand All @@ -397,7 +404,7 @@ public BulkFormatMapping build(
partitionPair,
formatDiscover
.discover(formatIdentifier)
.createReaderFactory(projectedRowType, dataFilters));
.createReaderFactory(projectedRowType, nonPartitionFilters));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;

/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
Expand Down Expand Up @@ -181,6 +183,9 @@ private RawFileBulkFormatMapping createBulkFormatMapping(FormatKey key) {
? filters
: SchemaEvolutionUtil.createDataFilters(
tableSchema.fields(), dataSchema.fields(), filters);
// Skip pushing down partition filters to reader
List<Predicate> nonPartitionFilters =
excludePredicateWithFields(dataFilters, new HashSet<>(dataSchema.partitionKeys()));

Pair<int[], RowType> partitionPair = null;
if (!dataSchema.partitionKeys().isEmpty()) {
Expand All @@ -205,7 +210,7 @@ private RawFileBulkFormatMapping createBulkFormatMapping(FormatKey key) {
partitionPair,
formatDiscover
.discover(key.format)
.createReaderFactory(projectedRowType, dataFilters),
.createReaderFactory(projectedRowType, nonPartitionFilters),
dataSchema,
dataFilters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,34 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
.hasMessageContaining("Hive migrator only support unaware-bucket target table")
}
}

test(s"Paimon migrate table procedure: select migrate table with partition filter") {
Seq("parquet", "orc", "avro").foreach(
format => {
withTable("migrate_tbl") {
spark.sql(s"""
|CREATE TABLE migrate_tbl (id STRING, name STRING, pt INT)
|USING $format
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(s"INSERT INTO migrate_tbl VALUES ('1', 'a', 1), ('2', 'b', 2)")

spark.sql(
s"""CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.migrate_tbl',
|options => 'file.format=$format')
|""".stripMargin)

checkAnswer(
spark.sql(s"SELECT * FROM migrate_tbl WHERE pt = 1 ORDER BY id"),
Row("1", "a", 1) :: Nil)
checkAnswer(
spark.sql(s"SELECT * FROM migrate_tbl WHERE pt IS NOT null ORDER BY id"),
Row("1", "a", 1) :: Row("2", "b", 2) :: Nil)
checkAnswer(
spark.sql(s"SELECT * FROM migrate_tbl WHERE name LIKE 'a%' OR pt IS null ORDER BY id"),
Row("1", "a", 1) :: Nil)
}
})
}
}

0 comments on commit af4294f

Please sign in to comment.