From acb0beaead147c8834113bd998ab071a20852977 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Thu, 1 Aug 2024 10:05:08 +0800 Subject: [PATCH] [core] Optimize manifest-full-compact and parts overwrite by PartitionPredicate (#3864) --- .../paimon/manifest/ManifestFileMeta.java | 33 ++------------- .../paimon/operation/FileStoreCommitImpl.java | 40 ++++++++++--------- .../paimon/partition/PartitionPredicate.java | 18 +++++++-- .../paimon/operation/FileDeletionTest.java | 27 +++++++------ 4 files changed, 54 insertions(+), 64 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java index 32ecc23b5956..49d606c80d3e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java @@ -21,8 +21,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.manifest.FileEntry.Identifier; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.types.BigIntType; @@ -30,7 +29,6 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.IOUtils; -import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,9 +43,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Metadata of a manifest file. */ @@ -291,11 +287,9 @@ public static Optional> tryFullCompaction( int j = 0; if (partitionType.getFieldCount() > 0) { Set deletePartitions = computeDeletePartitions(deltaMerged); - Optional predicateOpt = - convertPartitionToPredicate(partitionType, deletePartitions); - - if (predicateOpt.isPresent()) { - Predicate predicate = predicateOpt.get(); + PartitionPredicate predicate = + PartitionPredicate.fromMultiple(partitionType, deletePartitions); + if (predicate != null) { for (; j < base.size(); j++) { // TODO: optimize this to binary search. ManifestFileMeta file = base.get(j); @@ -404,23 +398,4 @@ private static Set computeDeletePartitions( } return partitions; } - - private static Optional convertPartitionToPredicate( - RowType partitionType, Set partitions) { - Optional predicateOpt; - if (!partitions.isEmpty()) { - RowDataToObjectArrayConverter rowArrayConverter = - new RowDataToObjectArrayConverter(partitionType); - - List predicateList = - partitions.stream() - .map(rowArrayConverter::convert) - .map(values -> createPartitionPredicate(partitionType, values)) - .collect(Collectors.toList()); - predicateOpt = Optional.of(PredicateBuilder.or(predicateList)); - } else { - predicateOpt = Optional.empty(); - } - return predicateOpt; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 39a66ae8c15b..4d72efe9fcf5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -39,6 +39,7 @@ import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.operation.metrics.CommitStats; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.SchemaManager; @@ -422,27 +423,24 @@ public void overwrite( try { boolean skipOverwrite = false; // partition filter is built from static or dynamic partition according to properties - Predicate partitionFilter = null; + PartitionPredicate partitionFilter = null; if (dynamicPartitionOverwrite) { if (appendTableFiles.isEmpty()) { // in dynamic mode, if there is no changes to commit, no data will be deleted skipOverwrite = true; } else { - partitionFilter = + Set partitions = appendTableFiles.stream() .map(ManifestEntry::partition) - .distinct() - // partition filter is built from new data's partitions - .map(p -> createPartitionPredicate(partitionType, p)) - .reduce(PredicateBuilder::or) - .orElseThrow( - () -> - new RuntimeException( - "Failed to get dynamic partition filter. This is unexpected.")); + .collect(Collectors.toSet()); + partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions); } } else { - partitionFilter = + // partition may be partial partition fields, so here must to use predicate way. + Predicate partitionPredicate = createPartitionPredicate(partition, partitionType, partitionDefaultName); + partitionFilter = + PartitionPredicate.fromPredicate(partitionType, partitionPredicate); // sanity check, all changes must be done within the given partition if (partitionFilter != null) { for (ManifestEntry entry : appendTableFiles) { @@ -511,7 +509,8 @@ public void dropPartitions(List> partitions, long commitIden partitions.stream().map(Objects::toString).collect(Collectors.joining(","))); } - Predicate partitionFilter = + // partitions may be partial partition fields, so here must to use predicate way. + Predicate predicate = partitions.stream() .map( partition -> @@ -519,6 +518,8 @@ public void dropPartitions(List> partitions, long commitIden partition, partitionType, partitionDefaultName)) .reduce(PredicateBuilder::or) .orElseThrow(() -> new RuntimeException("Failed to get partition filter.")); + PartitionPredicate partitionFilter = + PartitionPredicate.fromPredicate(partitionType, predicate); tryOverwrite( partitionFilter, @@ -722,7 +723,7 @@ private int tryCommit( } private int tryOverwrite( - Predicate partitionFilter, + @Nullable PartitionPredicate partitionFilter, List changes, List indexFiles, long identifier, @@ -784,7 +785,7 @@ private int tryOverwrite( } @VisibleForTesting - public boolean tryCommitOnce( + boolean tryCommitOnce( List tableFiles, List changelogFiles, List indexFiles, @@ -804,13 +805,13 @@ public boolean tryCommitOnce( : snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshotId); if (LOG.isDebugEnabled()) { - LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId); + LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId); for (ManifestEntry entry : tableFiles) { - LOG.debug(" * " + entry.toString()); + LOG.debug(" * {}", entry); } - LOG.debug("Ready to commit changelog to snapshot #" + newSnapshotId); + LOG.debug("Ready to commit changelog to snapshot {}", newSnapshotId); for (ManifestEntry entry : changelogFiles) { - LOG.debug(" * " + entry.toString()); + LOG.debug(" * {}", entry); } } @@ -1292,7 +1293,8 @@ static ConflictCheck noConflictCheck() { return latestSnapshot -> false; } - public static ConflictCheck mustConflictCheck() { + @VisibleForTesting + static ConflictCheck mustConflictCheck() { return latestSnapshot -> true; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java index b8339fee2ffb..12ea884be15f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java @@ -40,6 +40,7 @@ import java.util.Set; import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** A special predicate to filter partition only, just like {@link Predicate}. */ @@ -50,6 +51,10 @@ public interface PartitionPredicate { boolean test( long rowCount, InternalRow minValues, InternalRow maxValues, InternalArray nullCounts); + /** + * Compared to the multiple method, this approach can accept filtering of partially partitioned + * fields. + */ @Nullable static PartitionPredicate fromPredicate(RowType partitionType, Predicate predicate) { if (partitionType.getFieldCount() == 0 || predicate == null) { @@ -61,12 +66,17 @@ static PartitionPredicate fromPredicate(RowType partitionType, Predicate predica @Nullable static PartitionPredicate fromMultiple(RowType partitionType, List partitions) { + return fromMultiple(partitionType, new HashSet<>(partitions)); + } + + @Nullable + static PartitionPredicate fromMultiple(RowType partitionType, Set partitions) { if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) { return null; } return new MultiplePartitionPredicate( - new RowDataToObjectArrayConverter(partitionType), new HashSet<>(partitions)); + new RowDataToObjectArrayConverter(partitionType), partitions); } /** A {@link PartitionPredicate} using {@link Predicate}. */ @@ -127,13 +137,15 @@ private MultiplePartitionPredicate( PredicateBuilder builder = new PredicateBuilder(partitionType); for (int i = 0; i < collectors.length; i++) { SimpleColStats stats = collectors[i].result(); - if (stats.nullCount() == partitions.size()) { + Long nullCount = stats.nullCount(); + checkArgument(nullCount != null, "nullCount cannot be null!"); + if (nullCount == partitions.size()) { min[i] = builder.isNull(i); max[i] = builder.isNull(i); } else { min[i] = builder.greaterOrEqual(i, checkNotNull(stats.min())); max[i] = builder.lessOrEqual(i, checkNotNull(stats.max())); - if (stats.nullCount() > 0) { + if (nullCount > 0) { min[i] = PredicateBuilder.or(builder.isNull(i), min[i]); max[i] = PredicateBuilder.or(builder.isNull(i), max[i]); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 17693b073fef..3fb1d36ac305 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -787,19 +787,20 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { entry.file())) .collect(Collectors.toList()); // commit - store.newCommit() - .tryCommitOnce( - delete, - Collections.emptyList(), - Collections.emptyList(), - commitIdentifier++, - null, - Collections.emptyMap(), - Snapshot.CommitKind.APPEND, - store.snapshotManager().latestSnapshot(), - mustConflictCheck(), - DEFAULT_MAIN_BRANCH, - null); + try (FileStoreCommitImpl commit = store.newCommit()) { + commit.tryCommitOnce( + delete, + Collections.emptyList(), + Collections.emptyList(), + commitIdentifier++, + null, + Collections.emptyMap(), + Snapshot.CommitKind.APPEND, + store.snapshotManager().latestSnapshot(), + mustConflictCheck(), + DEFAULT_MAIN_BRANCH, + null); + } } private void createTag(Snapshot snapshot, String tagName, Duration timeRetained) {