Skip to content

Commit

Permalink
[core] Optimize drop partitions to avoid stack overflow (#4663)
Browse files Browse the repository at this point in the history
This closes #4663.
  • Loading branch information
JingsongLi authored Dec 9, 2024
1 parent ed43563 commit 55e4e74
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.types.DataType;
Expand All @@ -32,6 +33,7 @@
import java.util.Map;

import static org.apache.paimon.utils.InternalRowUtils.createNullCheckingFieldGetter;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.TypeUtils.castFromString;

/** PartitionComputer for {@link InternalRow}. */
Expand Down Expand Up @@ -102,6 +104,22 @@ public static Map<String, Object> convertSpecToInternal(
return partValues;
}

public static GenericRow convertSpecToInternalRow(
Map<String, String> spec, RowType partType, String defaultPartValue) {
checkArgument(spec.size() == partType.getFieldCount());
GenericRow partRow = new GenericRow(spec.size());
List<String> fieldNames = partType.getFieldNames();
for (Map.Entry<String, String> entry : spec.entrySet()) {
Object value =
defaultPartValue.equals(entry.getValue())
? null
: castFromString(
entry.getValue(), partType.getField(entry.getKey()).type());
partRow.setField(fieldNames.indexOf(entry.getKey()), value);
}
return partRow;
}

public static String partToSimpleString(
RowType partitionType, BinaryRow partition, String delimiter, int maxLength) {
FieldGetter[] getters = partitionType.fieldGetters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
Expand Down Expand Up @@ -530,17 +531,26 @@ public void dropPartitions(List<Map<String, String>> partitions, long commitIden
partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
}

// partitions may be partial partition fields, so here must to use predicate way.
Predicate predicate =
partitions.stream()
.map(
partition ->
createPartitionPredicate(
partition, partitionType, partitionDefaultName))
.reduce(PredicateBuilder::or)
.orElseThrow(() -> new RuntimeException("Failed to get partition filter."));
PartitionPredicate partitionFilter =
PartitionPredicate.fromPredicate(partitionType, predicate);
boolean fullMode =
partitions.stream().allMatch(part -> part.size() == partitionType.getFieldCount());
PartitionPredicate partitionFilter;
if (fullMode) {
List<BinaryRow> binaryPartitions =
createBinaryPartitions(partitions, partitionType, partitionDefaultName);
partitionFilter = PartitionPredicate.fromMultiple(partitionType, binaryPartitions);
} else {
// partitions may be partial partition fields, so here must to use predicate way.
Predicate predicate =
partitions.stream()
.map(
partition ->
createPartitionPredicate(
partition, partitionType, partitionDefaultName))
.reduce(PredicateBuilder::or)
.orElseThrow(
() -> new RuntimeException("Failed to get partition filter."));
partitionFilter = PartitionPredicate.fromPredicate(partitionType, predicate);
}

tryOverwrite(
partitionFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.paimon.partition;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.format.SimpleColStats;
Expand All @@ -33,13 +35,15 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

Expand Down Expand Up @@ -231,4 +235,15 @@ static Predicate createPartitionPredicate(
.map(p -> createPartitionPredicate(p, rowType, defaultPartValue))
.toArray(Predicate[]::new));
}

static List<BinaryRow> createBinaryPartitions(
List<Map<String, String>> partitions, RowType partitionType, String defaultPartValue) {
InternalRowSerializer serializer = new InternalRowSerializer(partitionType);
List<BinaryRow> result = new ArrayList<>();
for (Map<String, String> spec : partitions) {
GenericRow row = convertSpecToInternalRow(spec, partitionType, defaultPartValue);
result.add(serializer.toBinaryRow(row).copy());
}
return result;
}
}

0 comments on commit 55e4e74

Please sign in to comment.