Skip to content

Commit

Permalink
[core] Unify Partition Predicate generation in PartitionPredicate (#3427
Browse files Browse the repository at this point in the history
)
  • Loading branch information
JingsongLi authored May 30, 2024
1 parent b510fa8 commit 7c7fb20
Show file tree
Hide file tree
Showing 22 changed files with 196 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.predicate;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.Timestamp;
Expand All @@ -28,8 +27,6 @@
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.TypeUtils;

import javax.annotation.Nullable;

Expand All @@ -52,6 +49,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;

/**
* A utility class to create {@link Predicate} object for common filter conditions.
Expand Down Expand Up @@ -360,14 +358,15 @@ public static boolean containsFields(Predicate predicate, Set<String> fields) {
}

@Nullable
public static Predicate partition(Map<String, String> map, RowType rowType) {
// TODO: It is somewhat misleading that an empty map creates a null predicate filter
public static Predicate partition(
Map<String, String> map, RowType rowType, String defaultPartValue) {
Map<String, Object> internalValues = convertSpecToInternal(map, rowType, defaultPartValue);
List<String> fieldNames = rowType.getFieldNames();
Predicate predicate = null;
PredicateBuilder builder = new PredicateBuilder(rowType);
for (Map.Entry<String, String> entry : map.entrySet()) {
for (Map.Entry<String, Object> entry : internalValues.entrySet()) {
int idx = fieldNames.indexOf(entry.getKey());
Object literal = TypeUtils.castFromString(entry.getValue(), rowType.getTypeAt(idx));
Object literal = internalValues.get(entry.getKey());
Predicate predicateTemp =
literal == null ? builder.isNull(idx) : builder.equal(idx, literal);
if (predicate == null) {
Expand All @@ -379,32 +378,11 @@ public static Predicate partition(Map<String, String> map, RowType rowType) {
return predicate;
}

public static Predicate partitions(List<Map<String, String>> partitions, RowType rowType) {
public static Predicate partitions(
List<Map<String, String>> partitions, RowType rowType, String defaultPartValue) {
return PredicateBuilder.or(
partitions.stream()
.map(p -> PredicateBuilder.partition(p, rowType))
.map(p -> PredicateBuilder.partition(p, rowType, defaultPartValue))
.toArray(Predicate[]::new));
}

public static Predicate equalPartition(BinaryRow partition, RowType partitionType) {
Preconditions.checkArgument(
partition.getFieldCount() == partitionType.getFieldCount(),
"Partition's field count should be equal to partitionType's field count.");

RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter(partitionType);
Predicate predicate = null;
PredicateBuilder builder = new PredicateBuilder(partitionType);
Object[] literals = converter.convert(partition);
for (int i = 0; i < literals.length; i++) {
Predicate predicateTemp =
literals[i] == null ? builder.isNull(i) : builder.equal(i, literals[i]);
if (predicate == null) {
predicate = predicateTemp;
} else {
predicate = PredicateBuilder.and(predicate, predicateTemp);
}
}

return predicate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.utils.TypeUtils.castFromString;

/** PartitionComputer for {@link InternalRow}. */
public class RowDataPartitionComputer {
public class InternalRowPartitionComputer {

protected final String defaultPartValue;
protected final String[] partitionColumns;
protected final InternalRow.FieldGetter[] partitionFieldGetters;

public RowDataPartitionComputer(
public InternalRowPartitionComputer(
String defaultPartValue, RowType rowType, String[] partitionColumns) {
this.defaultPartValue = defaultPartValue;
this.partitionColumns = partitionColumns;
Expand Down Expand Up @@ -60,4 +63,18 @@ public LinkedHashMap<String, String> generatePartValues(InternalRow in) {
}
return partSpec;
}

public static Map<String, Object> convertSpecToInternal(
Map<String, String> spec, RowType partType, String defaultPartValue) {
Map<String, Object> partValues = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : spec.entrySet()) {
partValues.put(
entry.getKey(),
defaultPartValue.equals(entry.getValue())
? null
: castFromString(
entry.getValue(), partType.getField(entry.getKey()).type()));
}
return partValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@

package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.RowType;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;

/** Convert {@link InternalRow} to object array. */
Expand Down Expand Up @@ -68,15 +63,4 @@ public Object[] convert(InternalRow rowData) {
}
return result;
}

public Predicate createEqualPredicate(BinaryRow binaryRow) {
PredicateBuilder builder = new PredicateBuilder(rowType);
List<Predicate> fieldPredicates = new ArrayList<>();
Object[] partitionObjects = convert(binaryRow);
for (int i = 0; i < getArity(); i++) {
Object o = partitionObjects[i];
fieldPredicates.add(o == null ? builder.isNull(i) : builder.equal(i, o));
}
return PredicateBuilder.and(fieldPredicates);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public FileStoreCommitImpl newCommit(String commitUser) {
schemaManager,
commitUser,
partitionType,
options.partitionDefaultName(),
pathFactory(),
snapshotManager(),
manifestFileFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Metadata of a manifest file. */
Expand Down Expand Up @@ -405,7 +406,8 @@ private static Optional<Predicate> convertPartitionToPredicate(

List<Predicate> predicateList =
partitions.stream()
.map(rowArrayConverter::createEqualPredicate)
.map(rowArrayConverter::convert)
.map(values -> createPartitionPredicate(partitionType, values))
.collect(Collectors.toList());
predicateOpt = Optional.of(PredicateBuilder.or(predicateList));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;

/**
Expand Down Expand Up @@ -105,6 +106,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final SchemaManager schemaManager;
private final String commitUser;
private final RowType partitionType;
private final String partitionDefaultName;
private final FileStorePathFactory pathFactory;
private final SnapshotManager snapshotManager;
private final ManifestFile manifestFile;
Expand Down Expand Up @@ -133,6 +135,7 @@ public FileStoreCommitImpl(
SchemaManager schemaManager,
String commitUser,
RowType partitionType,
String partitionDefaultName,
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
Expand All @@ -152,6 +155,7 @@ public FileStoreCommitImpl(
this.schemaManager = schemaManager;
this.commitUser = commitUser;
this.partitionType = partitionType;
this.partitionDefaultName = partitionDefaultName;
this.pathFactory = pathFactory;
this.snapshotManager = snapshotManager;
this.manifestFile = manifestFileFactory.create();
Expand Down Expand Up @@ -413,15 +417,16 @@ public void overwrite(
.map(ManifestEntry::partition)
.distinct()
// partition filter is built from new data's partitions
.map(p -> PredicateBuilder.equalPartition(p, partitionType))
.map(p -> createPartitionPredicate(partitionType, p))
.reduce(PredicateBuilder::or)
.orElseThrow(
() ->
new RuntimeException(
"Failed to get dynamic partition filter. This is unexpected."));
}
} else {
partitionFilter = PredicateBuilder.partition(partition, partitionType);
partitionFilter =
createPartitionPredicate(partition, partitionType, partitionDefaultName);
// sanity check, all changes must be done within the given partition
if (partitionFilter != null) {
for (ManifestEntry entry : appendTableFiles) {
Expand Down Expand Up @@ -492,7 +497,10 @@ public void dropPartitions(List<Map<String, String>> partitions, long commitIden

Predicate partitionFilter =
partitions.stream()
.map(partition -> PredicateBuilder.partition(partition, partitionType))
.map(
partition ->
createPartitionPredicate(
partition, partitionType, partitionDefaultName))
.reduce(PredicateBuilder::or)
.orElseThrow(() -> new RuntimeException("Failed to get partition filter."));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.statistics.FullSimpleColStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** A special predicate to filter partition only, just like {@link Predicate}. */
public interface PartitionPredicate {

Expand Down Expand Up @@ -121,11 +127,13 @@ private MultiplePartitionPredicate(
PredicateBuilder builder = new PredicateBuilder(partitionType);
for (int i = 0; i < collectors.length; i++) {
SimpleColStats stats = collectors[i].result();
Object minValue = stats.min();
Object maxValue = stats.max();

min[i] = minValue == null ? builder.isNull(i) : builder.greaterOrEqual(i, minValue);
max[i] = maxValue == null ? builder.isNull(i) : builder.lessOrEqual(i, maxValue);
if (stats.nullCount() == partitions.size()) {
min[i] = builder.isNull(i);
max[i] = builder.isNull(i);
} else {
min[i] = builder.greaterOrEqual(i, checkNotNull(stats.min()));
max[i] = builder.lessOrEqual(i, checkNotNull(stats.max()));
}
}
}

Expand Down Expand Up @@ -153,4 +161,58 @@ public boolean test(
return true;
}
}

static Predicate createPartitionPredicate(RowType rowType, Map<String, Object> partition) {
PredicateBuilder builder = new PredicateBuilder(rowType);
List<String> fieldNames = rowType.getFieldNames();
Predicate predicate = null;
for (Map.Entry<String, Object> entry : partition.entrySet()) {
Object literal = entry.getValue();
int idx = fieldNames.indexOf(entry.getKey());
Predicate predicateTemp =
literal == null ? builder.isNull(idx) : builder.equal(idx, literal);
if (predicate == null) {
predicate = predicateTemp;
} else {
predicate = PredicateBuilder.and(predicate, predicateTemp);
}
}
return predicate;
}

static Predicate createPartitionPredicate(RowType partitionType, Object[] partition) {
Preconditions.checkArgument(
partition.length == partitionType.getFieldCount(),
"Partition's field count should be equal to partitionType's field count.");

Map<String, Object> partitionMap = new HashMap<>(partition.length);
for (int i = 0; i < partition.length; i++) {
partitionMap.put(partitionType.getFields().get(i).name(), partition[i]);
}

return createPartitionPredicate(partitionType, partitionMap);
}

static Predicate createPartitionPredicate(RowType partitionType, BinaryRow partition) {
Preconditions.checkArgument(
partition.getFieldCount() == partitionType.getFieldCount(),
"Partition's field count should be equal to partitionType's field count.");
RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter(partitionType);
return createPartitionPredicate(partitionType, converter.convert(partition));
}

@Nullable
static Predicate createPartitionPredicate(
Map<String, String> spec, RowType rowType, String defaultPartValue) {
Map<String, Object> internalValues = convertSpecToInternal(spec, rowType, defaultPartValue);
return createPartitionPredicate(rowType, internalValues);
}

static Predicate createPartitionPredicate(
List<Map<String, String>> partitions, RowType rowType, String defaultPartValue) {
return PredicateBuilder.or(
partitions.stream()
.map(p -> createPartitionPredicate(p, rowType, defaultPartValue))
.toArray(Predicate[]::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TypeUtils;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -58,10 +56,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;

/** Implementation of {@link SnapshotReader}. */
Expand Down Expand Up @@ -140,25 +138,12 @@ public SnapshotReader withSnapshot(Snapshot snapshot) {
@Override
public SnapshotReader withPartitionFilter(Map<String, String> partitionSpec) {
if (partitionSpec != null) {
List<String> partitionKeys = tableSchema.partitionKeys();
RowType rowType = tableSchema.logicalPartitionType();
PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
List<Predicate> partitionFilters =
partitionSpec.entrySet().stream()
.map(
m -> {
int index = partitionKeys.indexOf(m.getKey());
Object value =
TypeUtils.castFromStringInternal(
m.getValue(),
rowType.getTypeAt(index),
false);
return value == null
? predicateBuilder.isNull(index)
: predicateBuilder.equal(index, value);
})
.collect(Collectors.toList());
scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
Predicate partitionPredicate =
createPartitionPredicate(
partitionSpec,
tableSchema.logicalPartitionType(),
options.partitionDefaultName());
scan.withPartitionFilter(partitionPredicate);
}
return this;
}
Expand Down
Loading

0 comments on commit 7c7fb20

Please sign in to comment.